Implementing Co, a Small Language With Coroutines #4: Adding Channels
In the previous post, we added coroutines to Co, the small language we are implementing in this series of posts. In this post, we add channels to it to be able to communicate between coroutines.
- Implementing Co #1: The Parser
- Implementing Co #2: The Interpreter
- Implementing Co #3: Adding Coroutines
- Implementing Co #4: Adding Channels
- Implementing Co #5: Adding Sleep
Introduction
With coroutines, we can now have multiple Threads of Computation (ToCs) in a Co program. However, right now these ToCs work completely independent of each other. Often in such concurrent systems, we need to communicate between these ToCs, for example, one coroutine may produce some data that other coroutines may need to consume. Or, one coroutine may need to wait for some other coroutine to complete some task before it can proceed. For that, we need Synchonization between coroutines.
There are various ways to synchronize ToCs: Locks, Semaphores, Promises, Actors, Channels, Software Transactional Memory, etc. In particular, channels are generally used with coroutines for synchronization in many languages like Go, Kotlin, Python etc, and we are going to do the same.
Channels are a synchronization primitive based on Communicating Sequential Processes@1 (CSP). CSP is a formal language for describing patterns of interaction between concurrent processes. In CSP, processes communicate with each other by sending and receiving messages over channels.
A process can send a message to a channel only if the channel is not full, and blocks otherwise. Similarly, a process can receive a message from a channel only if the channel is not empty, blocking otherwise. Thus, channels provide a way for processes to synchronize with each other, and at the same time, communicate by passing messages.
Before we implement channels, we have to decide how they are going to work.
Channel Design
There are various design decisions that we need to make while implementing channels. Depending on what we choose, we end up with different kinds. Some of the major design decisions are:
- Buffered vs Unbuffered
-
A buffered channel has a buffer to store messages. A send operation on a buffered channel succeeds if the buffer is not full, even if there are no pending receive operations. On the other hand, a send operation on an unbuffered channel blocks until the message is received by some other process. For example, in Java
LinkedBlockingQueue
is a buffered channel, whileSynchronousQueue
is an unbuffered channel1. - Bounded vs Unbounded
-
A bounded channel has a buffer of fixed capacity, and can hold only a fixed number of messages at maximum. A send operation on a bounded channel blocks if the buffer is full and there are no pending receive operations. An unbounded channel has a buffer with no fixed capacity, and can hold any number of messages. A send operation on an unbounded channel never blocks. For example, in Java
ArrayBlockingQueue
is a bounded channel, whileLinkedBlockingQueue
is an unbounded one. - Synchronous vs Asynchronous
-
A synchronous channel blocks on send until the message is received by some other process, even if the channel has an unbounded buffer. An asynchronous channel does not block on send if the channel’s buffer has space. For example, in Java
LinkedTransferQueue
is a synchronous channel, whileArrayBlockingQueue
is an asynchronous channel. - Blocking vs Non-blocking
-
A blocking channel blocks on send if the channel’s buffer is full, or on receive if it is empty. A non-blocking channel never blocks on send or receive, and instead returns a sentinel value (usually the Null value), or throws an error to indicate that the operation could not be executed. For example, in Java
BlockingQueue.put
is a blocking send operation, whileBlockingQueue.offer
is a non-blocking send operation. - Fair vs Unfair
-
A fair channel ensures that the order of sends and receives is preserved. That means, if there are multiple pending sends and receives, they are executed in the order they were requested. An unfair channel does not guarantee any order. For example, in Java,
ArrayBlockingQueue
supports fair and unfair modes by passing a boolean flag to its constructor. - Locking vs Lock-free
-
A locking channel uses locks to synchronize access to the channel. A lock-free channel uses atomic operations for the same. For example, in Java
LinkedBlockingQueue
is a locking channel, whileConcurrentLinkedQueue
is a lock-free channel. - Selectable vs Non-selectable
-
A selectable channel can be used in a Select like operation to wait for a message on multiple channels at once. A non-selectable channel cannot be used in such an operation. For example, channels in Go and Clojure core.async are selectable, while aforementioned channels in Java are not.
In our implementation for Co, we have both buffered and unbuffered channels. The buffered channels are bounded, with a fixed capacity. The channels are asynchronous, blocking, fair, lock-free, and non-selectable.
Enough of theory, let’s see how channels work in Co.
Channel Operations
In this section, we explore the various scenarios for send and receive operations on a channel in Co using diagrams. These diagrams are for buffered channels. For unbuffered channels, the send operation acts as for a fully buffered channel, and the receive operation acts as for an empty buffered channel.
Each channel has three internal queues: a send queue, a receive queue, and a buffer2. The send and receive queues are used to store pending send and receive operations (as coroutines) respectively. The buffer is used to store data of the messages. The send and receive queues are always bounded, because otherwise any number of send and receive operations can be blocked on a channel, thus defeating the point of bounded buffer. In extreme cases, it can cause the program to run out of memory.
The invariants we must maintain for the channel operations are:
- There can never be pending send operations while there are pending receive operations, and vice versa. This is because a send operation will complete immediately if there are pending receive operations, and vice versa.
- There can never be pending receive operations while there are messages in the buffer. This is because a receive operation will complete immediately by dequeuing the oldest message in the buffer.
- There can never be pending send operations while there is room in the buffer. This is because a send operation will complete immediately by enqueuing the message in the buffer.
With these invariants in mind, let’s look at the different scenarios in detail:
- When a program tries to receive from a channel, and the channel has nothing in its buffer and there are no pending sends, the program blocks. The programs’s continuation is captured as a coroutine, and is enqueued to the receive queue. Note that the coroutine is not queued into the interpreter’s global coroutine queue.
- The corresponding scenario for a send operation is when the channel has pending receives. In this case, the send operation completes immediately, and the first coroutine in the receive queue is dequeued and resumed with the message.
- When there are no pending receives and the buffer is not full, the message is enqueued to the buffer, and the send operation completes immediately.
- In the corresponding scenario for a receive operation, when there are no pending sends, and there are messages in the buffer, the oldest message is dequeued, and the receive operation completes immediately with it.
- When the buffer is full, the program trying to do a send operation is blocked and its continuation is captured as a coroutine and queued into the send queue. Note that the coroutine is not queued into the interpreter’s global coroutine queue.
- In the corresponding scenario for a receive operation, when the buffer is full, the oldest message is dequeued from the buffer, and the receive operation completes immediately with it. If there are pending sends, the oldest coroutine in the send queue is dequeued and resumed, and its message is enqueued to the buffer.
- When the send queue is full and the buffer is full as well, an error is thrown when trying to do a send operation.
- Similarly, when the receive queue is full and the buffer is empty, an error is thrown when a receive operation is attempted.
That captures all scenarios for send and receive operations on a channel. In the next section, we implement channels in Co.
Adding Channels
Let’s start with defining the Channel
type:
data Channel = Channel
channelCapacity :: Int,
{ channelBuffer :: Queue Value,
channelSendQueue :: Queue (Coroutine (), Value),
channelReceiveQueue :: Queue (Coroutine Value)
}
newChannel :: Int -> Interpreter Channel
= Channel size <$> newQueue <*> newQueue <*> newQueue newChannel size
A channel has a buffer, a send queue, and a receive queue. The buffer is a queue of Co values, the receive queue is a queue of coroutines, and the send queue is a queue of coroutine and value pairs. A channel also has a capacity, which is the capacity of the buffer3.
Now, we add Channel
to the Value
type:
data Value
= Null
| Boolean Bool
| Str String
| Num Integer
| Function Identifier [Identifier] [Stmt] Env
| BuiltinFunction Identifier Int ([Expr] -> Interpreter Value)
| Chan Channel
Finally, we introduce some new built-in functions to create channels:
builtinEnv :: IO Env
= Map.fromList <$> traverse (traverse newIORef) [
builtinEnv "print", BuiltinFunction "print" 1 executePrint)
( , ("newChannel",
BuiltinFunction "newChannel" 0 $ fmap Chan . const (newChannel 0))
, ("newBufferedChannel",
BuiltinFunction "newBufferedChannel" 1 executeNewBufferedChannel)
, ("sleep", BuiltinFunction "sleep" 1 executeSleep)
, ("getCurrentMillis",
BuiltinFunction "getCurrentMillis" 0 executeGetCurrentMillis)
]
The newChannel
function creates an unbuffered channel, and the newBufferedChannel
function creates a buffered channel with the given capacity:
executeNewBufferedChannel :: [Expr] -> Interpreter Value
= evaluate (head argEs) >>= \case
executeNewBufferedChannel argEs Num capacity | capacity >= 0 -> Chan <$> newChannel (fromIntegral capacity)
-> throw "newBufferedChannel call expected a positive number argument" _
Wiring Channels
Moving on to wiring the channels into the existing interpreter implementation. First we add a new constructor for send statements to the Stmt
type:
data Stmt
= ExprStmt Expr
| VarStmt Identifier Expr
| AssignStmt Identifier Expr
| IfStmt Expr [Stmt]
| WhileStmt Expr [Stmt]
| FunctionStmt Identifier [Identifier] [Stmt]
| ReturnStmt (Maybe Expr)
| YieldStmt
| SpawnStmt Expr
| SendStmt Expr Expr
deriving (Show, Eq)
type Program = [Stmt]
And another for receive expressions to the Expr
type:
data Expr
= LNull
| LBool Bool
| LStr String
| LNum Integer
| Variable Identifier
| Binary BinOp Expr Expr
| Call Expr [Expr]
| Lambda [Identifier] [Stmt]
| Receive Expr
deriving (Show, Eq)
type Identifier = String
We have already written the code to parse these statements and expressions in the first post, so that’s taken care of. We need to modify the execute
and evaluate
functions to handle these new statements and expressions. Let’s start with execute
:
execute :: Stmt -> Interpreter ()
= \case
execute ExprStmt expr -> void $ evaluate expr
VarStmt name expr -> evaluate expr >>= defineVar name
AssignStmt name expr -> evaluate expr >>= assignVar name
IfStmt expr body -> do
<- evaluate expr
cond $
when (isTruthy cond)
traverse_ execute body@(WhileStmt expr body) -> do
while<- evaluate expr
cond $ do
when (isTruthy cond)
traverse_ execute body
execute whileReturnStmt mExpr -> do
<- traverse evaluate mExpr
mRet . Return . fromMaybe Null $ mRet
throwError FunctionStmt name params body -> do
<- State.gets isEnv
env $ Function name params body env
defineVar name YieldStmt -> yield
SpawnStmt expr -> spawn expr
SendStmt expr chan -> evaluate chan >>= \case
Chan channel -> do
val <- evaluate expr
channelSend val channel
v -> throw $ "Cannot send to a non-channel: " <> show v
where
= \case
isTruthy Null -> False
Boolean b -> b
-> True _
To execute a SendStmt
, we evaluate its arguments to get the channel and the value to send. Then we call the channelSend
function to send the value over the channel.
Similarly, to evaluate a Receive
expression, we evaluate its argument to get the channel, and then call the channelReceive
function to receive a value from the channel:
evaluate :: Expr -> Interpreter Value
= \case
evaluate LNull -> pure Null
LBool bool -> pure $ Boolean bool
LStr str -> pure $ Str str
LNum num -> pure $ Num num
Variable v -> lookupVar v
Lambda params body -> Function "<lambda>" params body <$> State.gets isEnv
@Binary {} -> evaluateBinaryOp binary
binary@Call {} -> evaluateFuncCall call
call Receive expr -> evaluate expr >>= \case
Chan channel -> channelReceive channel
val -> throw $ "Cannot receive from a non-channel: " <> show val
Now comes the core of the implementation: the channelSend
and channelReceive
functions. Let’s look into them in detail.
Sending and Receiving
The channelSend
function takes a value and a channel, and sends the value over the channel, blocking if necessary.
channelSend :: Value -> Channel -> Interpreter ()
Channel {..} = do
channelSend value <- queueSize channelBuffer
bufferSize <- queueSize channelSendQueue
sendQueueSize
>>= \case
dequeue channelReceiveQueue -- there are pending receives
Just coroutine@Coroutine {..} ->
$ coroutine { corCont = const $ corCont value }
scheduleCoroutine
-- there are no pending receives and the buffer is not full
Nothing | channelCapacity > 0 && bufferSize < channelCapacity ->
enqueue value channelBuffer
-- there are no pending receives and
-- (the buffer is full or the channel is unbuffered)
Nothing | sendQueueSize < maxSendQueueSize -> do
<- State.gets isEnv
env $ \cont -> do
callCC <- newCoroutine env cont
coroutine
enqueue (coroutine, value) channelSendQueue
runNextCoroutine
-- the send queue is full
Nothing -> throw "Channel send queue is full"
where
= 4 maxSendQueueSize
This is a direct implementation of the algorithm we discussed earlier using diagrams. We try to dequeue a coroutine from the receive queue. Then:
- If there is a coroutine, we schedule it to be run with the sent value. The send call does not block.
- If there is no coroutine, and
- the channel is buffered and the buffer is not full, we enqueue the sent value to the buffer. The send call does not block.
- the buffer is full, we create a new coroutine with the current continuation, and enqueue the coroutine and the value to the send queue. The send call blocks.
- If the send queue is full, we throw an error.
Next, let’s write the channelReceive
function:
channelReceive :: Channel -> Interpreter Value
Channel {..} = do
channelReceive <- dequeue channelSendQueue
mSend <- dequeue channelBuffer
mBufferedValue <- queueSize channelReceiveQueue
recieveQueueSize
case (mSend, mBufferedValue) of
-- the channel is unbuffered and there are pending sends
Just (sendCoroutine, sendValue), Nothing) -> do
(
scheduleCoroutine sendCoroutinereturn sendValue
-- the buffer is full and there are pending sends
Just (sendCoroutine, sendValue), Just bufferedValue) -> do
(
scheduleCoroutine sendCoroutine
enqueue sendValue channelBufferreturn bufferedValue
-- the buffer is empty and there are no pending sends
Nothing, Nothing) | recieveQueueSize < maxReceiveQueueSize -> do
(<- State.gets isEnv
env $ \receive -> do
callCC <- newCoroutine env receive
coroutine
enqueue coroutine channelReceiveQueue
runNextCoroutinereturn Null
-- the receive queue is full
Nothing, Nothing) -> throw "Channel receive queue is full"
(
-- the buffer is not empty and there are no pending sends
Nothing, Just bufferedValue) -> return bufferedValue
(where
= 4 maxReceiveQueueSize
This is also a straightforward implementation of the algorithm. We try to dequeue a coroutine and its value from the send queue, and another value from the buffer. Then:
- If there is a coroutine,
- but no buffered value, we schedule the coroutine to be resumed, and return its value. The returned value becomes the value that is received from the channel. The receive call does not block.
- and a buffered value, we schedule the coroutine to be resumed, enqueue its value to the buffer, and return the buffered value. The receive call does not block.
- If there is no coroutine and no buffered value, and the receive queue is not full, we create a new coroutine with the current continuation, and enqueue it to the receive queue. The receive call blocks.
- If the receive queue is full, we throw an error.
We hardcode the capacity of the send and receive queues to 4.
That’s it for the implementation of channels. Since we broke down the scenarios for send and receive operations, the implementation is not complicated. Let’s see it in action next.
Pubsub using Channels
In this demo, we implement a pubsub system using channels. The pubsub system consists of a server and a set of workers. The server sends messages to the workers over a channel. The workers print the messages and send acks back to the server over another channel. After sending all the messages, the server waits for all the acks from the workers, and then stops the workers.
Diagrammatically, the pubsub system looks like this:
The boxes with double borders are ToCs, and the ones with single borders are channels. The arrows show how the ToCs and channels are connected.
Pubsub code
// server sends messages to workers.
function startServer(messageCount, messageChan) {
print("server starting");
var i = 1;
while (i < messageCount + 1) {
print("server sending: " + i);
-> messageChan;
i print("server sent: " + i);
= i + 1;
i
}
}
// workers receive messages over a channel, print them.
// and send a ack back to the sender on a channel.
function worker(name, messageChan, ackChan) {
print("worker " + name + " starting");
var message = null;
while (true) {
= <- messageChan;
message print("worker " + name + " received: " + message);
if (message == null) {
print("worker " + name + " stopped");
return;
}print("worker " + name + " sending: " + message);
-> ackChan;
message print("worker " + name + " sent: " + message);
}
}
// start workers.
function startWorkers(workerCount, messageChan, ackChan) {
print("workers starting");
var i = 1;
while (i < workerCount + 1) {
function(name) {
worker(name, messageChan, ackChan);
spawn ;
}(i)= i + 1;
i
}print("workers scheduled to be started");
}
// server waits for acks from workers.
function waitForWorkers(messageCount, ackChan, doneChan) {
print("server waiting for acks");
var i = 1;
var message = null;
while (i < messageCount + 1) {
= <- ackChan;
message print("server received: " + message);
= i + 1;
i
}print("server received all acks");
null -> doneChan;
}
// stop workers.
function stopWorkers(workerCount, messageChan, doneChan) {
var done = <- doneChan;
print("workers stopping");
var i = 1;
while (i < workerCount + 1) {
null -> messageChan;
= i + 1;
i
}print("workers scheduled to be stopped");
}
var workerCount = 3;
var messageCount = 7;
var messageBufferSize = 5;
var ackBufferSize = 1;
var messageChan = newBufferedChannel(messageBufferSize);
var ackChan = newBufferedChannel(ackBufferSize);
var doneChan = newChannel();
startWorkers(workerCount, messageChan, ackChan);
waitForWorkers(messageCount, ackChan, doneChan);
spawn startServer(messageCount, messageChan);
stopWorkers(workerCount, messageChan, doneChan);
Running the program produces this output:
Pubsub output
workers starting
workers scheduled to be started
server starting
server sending: 1
server sent: 1
server sending: 2
server sent: 2
server sending: 3
server sent: 3
server sending: 4
server sent: 4
server sending: 5
server sent: 5
server sending: 6
worker 1 starting
worker 1 received: 1
worker 1 sending: 1
worker 1 sent: 1
worker 1 received: 2
worker 1 sending: 2
worker 2 starting
worker 2 received: 3
worker 2 sending: 3
worker 3 starting
worker 3 received: 4
worker 3 sending: 4
server waiting for acks
server received: 1
server received: 2
server received: 3
server received: 4
server sent: 6
server sending: 7
server sent: 7
worker 1 sent: 2
worker 1 received: 5
worker 1 sending: 5
worker 1 sent: 5
worker 1 received: 6
worker 1 sending: 6
worker 1 sent: 6
worker 1 received: 7
worker 1 sending: 7
worker 2 sent: 3
worker 3 sent: 4
server received: 5
server received: 6
server received: 7
server received all acks
worker 1 sent: 7
workers stopping
workers scheduled to be stopped
worker 2 received: null
worker 2 stopped
worker 3 received: null
worker 3 stopped
worker 1 received: null
worker 1 stopped
The output shows how the server and worker coroutines yield control to each other when they are waiting for messages or acks4.
Bonus Round: Emulating Actors
The Actor model is a concurrent programming paradigm where computation is carried out by lightweight processes called Actors that can only communicate with each other by sending messages. This makes them ideal for building concurrent and distributed systems.
In this section, we emulate actors in Co using channels:
function start(process) {
var inbox = newChannel();
spawn (function () {
var val = null;
while (true) {
= <- inbox;
val if (val == null) { return; }
process(val);
};
})()return function (message) { message -> inbox; };
}
function send(actor, message) { actor(message); }
function stop(actor) { actor(null); }
Actors are implemented as wrappers around channels. By sending messages to an actor’s channel, we can send messages to the actor. However, we cannot expose the channels directly, so we wrap them in functions.
The start
function creates and starts an actor by creating a new channel, and spawning a coroutine that receives messages from the channel in a loop and passes them to the process
function taken as a parameter by the start
function. Upon receiving a null
value, the coroutine returns, which stops the actor.
The start
function returns a function to send messages to the actor, which works by sending the messages to the actor’s channel.
The send
function is a convenience function to send a message to an actor. The stop
function stop an actor by sending it a null
message.
It was easy, wasn’t it? Now let’s use actors in some different ways.
Let’s start with a simple example of an actor that prints the received messages:
var printer = start(print);
send(printer, "world");
spawn send(printer, "hello");
stop(printer);
The process
parameter here is the print
function. Running this program produces the following output:
hello
world
Next, let’s write an actor that counts. For that, first we need to create a 2-Tuple data structure using closures, named Pair
5:
function Pair(first, second) {
return function (command) {
if (command == "first") { return first; }
if (command == "second") { return second; }
return null;
;
}
}
function first(pair) { return pair("first"); }
function second(pair) { return pair("second"); }
Now we implement the counter actor:
function makeCounter() {
var value = 0;
return start(function (message) {
var command = first(message);
var arg = second(message);
if (command == "inc") { value = value + arg; }
if (command == "get") { send(arg, value); }
;
}) }
The makeCounter
function creates a counter actor. The counter actor is started with a processing function that takes a message as a Pair
, extracts the command and the argument from the message, and increments the counter value or sends the counter value back depending on the command. We exercise the counter like this:
var printer = start(print);
var counter1 = makeCounter();
send(counter1, Pair("inc", 1));
send(counter1, Pair("get", printer));
send(counter1, Pair("inc", 2));
send(counter1, Pair("get", printer));
stop(counter1);
var counter2 = makeCounter();
send(counter2, Pair("inc", 5));
send(counter2, Pair("get", printer));
stop(counter2);
stop(printer);
The output of the program is:
1
3
5
And for the grand finale, let’s reimplement the ping-pong program using actors:
function makePingPonger(name) {
var self = null;
function pingPong(message) {
var value = first(message);
var other = second(message);
if (value == "done") {
print(name + " done");
spawn (function () { stop(self); } ());
return;
}
print(name + " " + value);
if (value == 0) {
print(name + " done");
send(other, Pair("done", self));
spawn (function () { stop(self); } ());
return;
}
send(other, Pair(value - 1, self));
}= start(pingPong);
self return self;
}
The makePingPonger
function creates a ping-ponger actor. The ping-ponger actor is started with a processing function that takes a message as a Pair
of the value to print and the other actor to send the next message to. The processing function prints the value, decrements it, and sends it to the other actor. If the value is 0, it sends a done
message to the other actor and stops itself. If the value is done
, it stops itself.
Upon running it like this:
var pinger = makePingPonger("ping");
var ponger = makePingPonger("pong");
send(pinger, Pair(10, ponger));
It produces the same output as the original ping-pong program:
ping 10
pong 9
ping 8
pong 7
ping 6
pong 5
ping 4
pong 3
ping 2
pong 1
ping 0
ping done
pong done
In this post, we added channels to Co, and used them to create a variety of concurrent programs. We learned about CSP and how implement it using coroutines and channels. In the next post, we will add support for sleep to Co.
The code for complete Co interpreter is available here.
Acknowledgements
Many thanks to Steven Deobald for reviewing a draft of this article.
If you have any questions or comments, please leave a comment below. If you liked this post, please share it. Thanks for reading!
Recently, Java added support for Virtual Threads, which though are not cooperatively scheduled like coroutines, are scheduled by the JVM, and are very lightweight. With virtual threads, the various Java queues can be considered channels as defined in CSP.↩︎
The design of channels in Co is inspired by the design of channels in Clojure core.async. It is a simplified version, not supporting some of the features of core.async, such as transducers, and alts.↩︎
Recall that the
Queue
type is an immutable queue data structure wrapped in anIORef
, which we manipulate using atomic operationsatomicModifyIORef'
.↩︎You can try running the program with different values for the
workerCount
,messageCount
,messageBufferSize
andackBufferSize
variables to see how it behaves. You can also try changing the order of the function calls at the end of the program, or prefixing them withspawn
to see how it affects the output. In some cases, the program may deadlock and hang, and in some other cases, it may throw an error. Try to understand why.↩︎We used the same trick to create a binary tree data structure in the previous post.↩︎
Got suggestions, corrections, or thoughts? Post a comment!
7 comments
Simon Fowler
Hécate Kleidukos
sebastien
Abhinav Sarkar
chilled_programmer
Abhinav Sarkar
umlcat