Implementing Go-style Channels in C++ from scratch
Learn about C++'s powerful concurrency primitives by building an intuitive inter-thread messaging channel.
If you've ever worked with Go, you know how channels simplify concurrent programming. They provide a clean, safe way for goroutines to communicate and synchronize. One other such cool feature is the for-select
loop, which lets you listen on multiple channels concurrently.
C++ provides some extremely powerful concurrency primitives, however, there’s nothing as intuitive and easy to use as Channels, but it doesn’t have to be that way.
I recently wrote a small library called CppChan, which allows you to use Go-like channels and for-select loops inside C++ (albeit with C++-like syntax).
We’ll go through the library’s implementation, which uses nothing but the language’s built-in concurrency primitives. By the end of this article, you will have a deep understanding of this library and a strong grasp of C++’s concurrency model.
The Channel API in Go
Go's concurrency model is built around goroutines and channels, providing a powerful yet straightforward way to manage concurrent operations. Goroutines are lightweight threads managed by the Go runtime, allowing you to execute functions concurrently without the overhead of traditional threads. Channels are Go's mechanism for inter-goroutine communication, enabling safe data exchange between goroutines.
You can create channels with the make
function, and send/receive messages using the <-
operator.
The select
statement allows a goroutine to wait on multiple communication operations, proceeding with whichever is ready first. This is particularly useful for managing multiple concurrent tasks by enabling the program to react dynamically to various events.
Buffered and Unbuffered Channels
Channels can be either buffered or unbuffered. Unbuffered channels, created with make(chan Type)
, require both the sender and receiver to be ready at the same time for the communication to occur. This ensures a synchronous exchange, where the sender blocks until the receiver is ready to receive, and vice versa. This tight coupling is useful for ensuring that data is passed immediately and allows for precise synchronization between goroutines.
Buffered channels, on the other hand, are created with make(chan Type, capacity)
, where capacity
specifies the number of elements the channel can hold before blocking. In buffered channels, the sender can send up to capacity
elements without waiting for a receiver, making the communication asynchronous. Once the buffer is full, the sender blocks until a receiver reads an element from the channel. Buffered channels are useful for decoupling the sender and receiver, allowing the sender to proceed with other tasks without being immediately blocked by the receiver's readiness. They provide a way to handle bursty data flows and can help in balancing workloads between goroutines, making them a versatile tool in Go's concurrency model.
Our C++ API
CppChan provides an API for channel-based concurrency, similar to Go's channels and select statements. It offers a Channel
class template and a Selector
class.
The Channel<T>
class template implements both buffered and unbuffered channels. It provides methods for sending and receiving data, including:
Blocking operations: (
send
,receive
)Non-blocking operations: (
try_send
,try_receive
)Asynchronous operations: (
async_send
,async_receive
)
Channels can be closed to signal that no more values will be sent, and their state can be queried using methods like is_closed()
, is_empty()
, and size()
. This allows developers to implement various concurrency patterns and handle different scenarios efficiently.
The Selector
class implements functionality similar to Go's select
statement and allows simultaneous monitoring of multiple channels. It provides a select
method that blocks until a message is available on any of the registered channels, and then processes it using a specified callback function. This enables efficient handling of multiple concurrent operations, similar to Go's for-select loop. The select loop can be terminated by calling the stop()
method on the selector.
The Selector’s API exposes three main methods:
add_receive()
: Adds a channel to be monitored, along with a callback function (which is executed whenever a message is received on the channel).select()
: The core method that waits for and processes data from the channels.stop()
: Signals the select operation to stop.
There’s also a notify()
method meant for internal use by the channels to notify the selector about updates.
This approach allows for non-blocking multiplexed I/O operations across multiple channels, making it easy to manage complex concurrent systems in C++.
A Working Example
Before we dissect the library’s internals, it will be good to see it in action.
This example demonstrates the usage of our Channel
and Selector
structures to create a multi-producer, single-consumer scenario with different types of channels. Let's break it down:
We instantiate two buffered channels with distinct data types:
Channel<int>
andChannel<std::string>
.We create a
Selector
that will be used by the consumer to listen on the two Channels.Two producer functions are defined:
int_producer
: Produces integers and sends them to an int channel.string_producer
: Produces strings and sends them to a string channel.
Both producers:
Run for a specified number of iterations.
Use
try_send
to attempt sending without blocking.Log successful and failed send attempts.
The
consumer
function:Adds receive callbacks for both int and string channels to the selector. This is similar to listening on multiple channels in Go
Calls
selector.select()
to continuously process incoming data until signaled to stop by the main thread.
In the
main
function:Two channels are created:
ch_int
andch_str
, both with a capacity of 5.One consumer thread and three producer threads (two for ints, one for strings) are started.
The main thread waits for all producer threads to finish.
The main thread calls the
stop()
method on the selector to exit the select loop.The main thread waits for the consumer thread to finish.
Channel Internals
The most basic building blocks of our Channels are the send
and receive
methods. Let’s begin by understanding their behavior.
send
void send(const T& value);
The send method is responsible for sending a value to the channel. It blocks if the channel is full (buffered) or if there’s no receiver (unbuffered).
Locking and Initial Check:
The function starts by acquiring a lock on the mutex using
std::unique_lock
.It immediately checks if the channel is closed, throwing an exception if so.
Buffered vs Unbuffered Channels: The function behaves differently based on whether the channel is buffered (capacity > 0) or unbuffered (capacity == 0).
For unbuffered channels
It waits until there's a receiver or the channel is closed.
After waiting, check again if the channel was closed while waiting.
If not closed, decrease the waiting receivers count and push the value to the queue.
For buffered channels:
It waits until there's space in the buffer or the channel is closed.
After waiting, check again if the channel was closed while waiting.
If not closed, push the value to the queue.
Notification
Notify one waiting receiver using
cv_recv.notify_one()
.Notify all registered selectors (which may be listening on this channel)
The lock is automatically released when the function exits, thanks to
std::unique_lock
.
Conditional variables (
std::condition_variable
) are synchronization primitives used in conjunction with mutexes to block a thread until a certain condition is met or a notification is received from another thread. They're particularly useful for producer-consumer scenarios, which is exactly what we’re doing here.Here we’re using two condition variables:
cv_send
andcv_recv
.The
wait
function on a condition variable (e.g.,cv_send.wait(lock, ...)
) does the following:
It atomically releases the lock (remember,
std::unique_lock
allows this behavior) and puts the thread to sleep.When awakened, reacquires the lock and checks the condition.
If the condition is false, goes back to sleep. If true, continues execution.
The condition is provided as a lambda function. For example:
cv_send.wait(lock, [this] { return waitingReceivers > 0 || closed; });
This waits until there are waiting receivers or the channel is closed.
After the wait, the code uses
cv_recv.notify_one()
to wake up one waiting receiver thread.Using condition variables allows us to efficiently wait for specific conditions without busy-waiting, and to wake up other threads when those conditions are met.
receive
std::optional<T> receive();
The receive
method is responsible for receiving a value from the channel. Blocks if the channel is empty.
The function begins by acquiring a lock on the mutex using
std::unique_lock
.The function behaves differently based on whether the channel is buffered (capacity > 0) or unbuffered (capacity == 0).
Unbuffered Channel Logic
Increment the count of waiting receivers.
Notify one potential sender that a receiver is ready.
Wait until there's a value in the queue or the channel is closed.
After waiting, decrease the count of waiting receivers.
Buffered Channel Logic
Simply wait until there's a value in the queue or the channel is closed.
If the queue is empty and the channel is closed, return
std::nullopt
. This indicates that no more values will be available on this channel.If there's a value, retrieve it from the front of the queue.
Notify one waiting sender that space is now available in the queue.
Asynchronous and Non-Blocking Send
The async and non-blocking flavors of the send
method are similar to the original implementation with only a few minor tweaks.
async_send
The
async_send
function is designed to send a value to the channel asynchronously, meaning it doesn't block the calling thread.It returns a
std::future<void>
, which represents the eventual completion of the asynchronous operation.std::launch::async
is a launch policy that specifically requestsstd::async
to run the function on a new thread.The lambda supplied to
std::async
simply calls the regularsend
method.
try_send
Largely similar to the regular send
method, with the difference that it doesn’t wait for any conditions to become true. Returns true if it is successful and false otherwise.
Asynchronous and Non-Blocking Receive
Similar to how the async and non-blocking send methods are implemented, the different flavors of the receive method include only subtle variations from the original implementation.
Building the Selector
The Selector class allows monitoring multiple Channel objects simultaneously for incoming messages. It provides a non-blocking way to handle data from multiple channels.
Selector flow
Channel Registration:
When add_receive() is called on the Selector: a. It registers the Selector with the Channel using register_selector(). b. It adds a lambda function to the
channels
vector. This lambda:Checks if the channel is closed.
Tries to receive a value from the channel.
Calls the provided callback if a value is received.
Select Operation:
The
select()
method enters a wait-loop until a stop is requested or all channels are processed.Inside the loop:
It waits on the condition variable until either:
A stop is requested, or
Any channel has data available (checked by calling the lambda functions in channels).
Once awoken, it processes all channels that have data available:
If a channel returns true (indicating it's done or closed), it's removed from the list.
If a channel returns false, it's kept in the list for future processing.
If all channels are closed and removed, the loop breaks.
Notification Mechanism:
Channels notify the Selector when new data is available, in the Channel's
send()
method, after pushing a valuefor (auto selector : selectors) { selector->notify(); }
This calls the Selector's
notify()
method, which wakes up the waitingselect()
method.
Stopping the Selector:
The
stop()
method sets thestop_flag_
and callsnotify()
.This causes the
select()
method to wake up and check the stop condition.
Conclusion
In this issue, we went deep into how one can design and implement Golang-like channels in C++ along with a multiplexed for-select
like structure. These channels simplify writing highly concurrent applications without having to worry about locks or race conditions.
Additionally, we explored modern C++’s powerful concurrency primitives like std::unique_lock
and std::condition_variable
which allow us to write clean and maintainable concurrent code.
The complete code for the library is available at github.com/JyotinderSingh/CppChan. Be sure to give the repository a star if you find it useful.