Build a Concurrent Publish/Subscribe Message Broker with Go in Under 500 Lines
It's like Redis Pub/Sub but worse
First, an interview question
“How do you scale a web-socket-based chat application?”
If you’ve ever prepared for a system design interview, you know that web sockets are stateful and, therefore tricky to scale. The system that helps you solve this problem is what we’ll be discussing in this issue.
If you have no idea what any of this means, then rather than giving you a primer I’ll ask you to watch this excellent video by Hussein Nasser, once you’ve read this issue :)
Introduction
Publish/Subscribe is a messaging pattern commonly used in distributed applications, where publishers send messages without knowing who the subscribers are. Subscribers, on the other hand, express interest in one or more categories of messages and receive only those that interest them.
In this pattern, components typically interact with a Topic (or a channel), either by publishing messages to it or subscribing to receive messages from it. The management of these topics and the subscriptions to them are typically handled by a component known as the Message Broker.
One of the most commonly used implementations of this pattern is provided by Redis, known as Redis Pub/Sub.
Different Publish/Subscribe implementations can offer different auxiliary functionalities to serve increasingly sophisticated use cases such as:
Durable subscriptions: Ensures that messages are not lost even if a subscriber is temporarily disconnected. This is crucial for maintaining message continuity in systems where subscribers might have intermittent connectivity.
Message Retention: This feature allows you to define policies controlling how long the system retains messages on the disk, ensuring the availability of historical data as needed.
At-Least-Once, At-Most-Once, and Exactly-Once Delivery Semantics:
At-Least-Once: Ensures that messages are delivered at least once but may result in duplicates.
At-Most-Once: Guarantees that messages are delivered no more than once, possibly leading to the loss of some messages.
Exactly-Once: Provides the most stringent guarantee, ensuring each message is delivered exactly once but is extremely hard to achieve and requires sophisticated publisher and consumer-side support (offered by Kafka).
Dead Letter Queues: Handles messages that cannot be delivered or processed. This is useful for debugging or for subsequent processing of undeliverable messages.
Message Ordering: Maintains the order in which messages are sent and received. This ensures that the sequence of messages reflects their original sending order, which is critical in many applications.
However, Redis Pub/Sub focuses on simplicity and provides guarantees that are both easy to understand and require minimal configuration:
At-Most-Once delivery semantics: A message will be delivered once if at all. There are no retries. If the subscriber is unable to handle the message (for example, due to an error or a network disconnect) the message is lost forever. This makes it suitable for real-time messaging scenarios where timely delivery is more important than reliability.
Fire & Forget: There are no acknowledgments to any of the messages that a publisher sends. The publisher fires off the message and moves on. This is ideal for scenarios where the overhead of tracking message delivery is not justified.
No message persistence: Messages aren’t persisted on the disk and cannot be recovered.
We’ll be implementing something similar, and it seems fair that our implementation does not guarantee anything more than what Redis does. Supporting more sophisticated functionality is left as an exercise for the reader :)
Implementation
The complete code which we’ll be working towards is available on my GitHub.
We’ll be writing a pluggable, library-like implementation. The broker runs on a single node and is capable of serving requests over a network to remote clients. It can handle an arbitrary amount of publishers and consumers (up to the physical network and hardware limits, of course).
As already mentioned, the broker will implement at-most-once delivery semantics in a fire & forget fashion, and offer no message persistence. The broker will be capable of handling concurrent publishing requests from multiple publishers.
Defining a gRPC Service
We’ll be using gRPC for communication between the publishers, consumers, and the broker. gRPC is an efficient, high-performance serialization and communication framework developed by Google based on Protocol Buffers. It offers us the following functionality:
HTTP/2: Allows for multiplexing multiple requests over a single connection. This reduces overhead and latency, particularly beneficial for a pub/sub system with high-throughput and low-latency requirements.
Scalability: Supports a large number of concurrent connections, which is essential for a broker that may need to handle connections from several publishers and subscribers.
Efficient Encoding: It offers efficient Varint and Zig-Zag encoding out of the box, which helps reduce the size of data transmitted over the network.
Streaming Support: gRPC supports streams, which allows subscribers to receive a continuous flow of messages over a longer duration. This feature eliminates the need for frequent polling to check for new messages. With streaming, data is pushed to subscribers as it becomes available.
We define the following gRPC service for the broker:
This service exposes the Subscribe, Unsubscribe, and Publish RPCs for the publishers and consumers to interact with.
The Subscribe
RPC returns a stream. When a consumer subscribes to a topic, a gRPC stream is established between the consumer and the broker. This stream is maintained for the entire duration of the subscription, ensuring continuous, real-time communication without significant resource overhead. We’ll discuss this pattern in more detail in the following sections.
When the broker receives a message on a topic, it uses the streams registered under that topic to push messages to the respective consumers.
It is important to note that consumers are required to send their unique IDs when subscribing to or unsubscribing from a topic. The broker needs this ID to be able to identify the consumer’s respective stream under a given topic.
You can generate the Go stubs from this service definition using the protoc
compiler.
protoc --go_out=. --go_opt=paths=source_relative \
--go-grpc_out=. --go-grpc_opt=paths=source_relative api.proto
The Broker
The broker is the central component of our pub/sub implementation, responsible for maintaining subscriber lists for each topic and routing messages to the relevant topics accordingly.
The architecture of the broker is as follows:
As we've discussed, when a consumer subscribes to a new topic, a new stream is created specifically for that topic. This raises a question: why not use a single stream for all subscriptions of a given consumer? Both approaches are viable, but each comes with its advantages and trade-offs.
Separate Stream for Each Topic a Consumer is Subscribed to
Pros
Isolation of Topics: Separate streams ensure that the message flow for one topic does not interfere with another.
Parallel Processing: Different streams can be processed in parallel, potentially improving performance.
Fault Tolerance: If one stream encounters an error or needs to be dropped, it does not affect the other streams. This can happen in case one of the topics is overwhelmed with messages.
Cons
Resource Overhead: Each stream consumes resources. The overhead can become significant if there are many topics, potentially leading to higher memory and CPU usage.
Complex Management: Managing multiple streams can be more complex than handling a single stream, especially in terms of monitoring and lifecycle management.
Potentially Higher Latency: Setting up a new stream may introduce latency, particularly if subscribers frequently change topics.
Common Stream for All Topics a Consumer is Subscribed to
Pros
Resource Efficiency: A single stream minimizes the overhead of multiple connections.
Simplified Architecture: Managing one stream reduces the complexity for both the consumer and the broker.
Quicker Subscription Changes: Altering subscriptions is faster as it doesn’t involve establishing or terminating streams.
Cons
Head-of-Line Blocking: A slowdown in the stream due to one topic can delay message delivery for others.
Reduced Fault Isolation: Any disruption in the stream affects all subscribed topics, increasing the risk from single points of failure.
We’ll be going ahead with maintaining separate streams for each topic in our implementation.
The above code snippet represents the core structure of our broker.
streamKey
: Uniquely identifies each stream established with the broker.
pb.UnimplementedPubSubServiceServer
: This embedded field represents the gRPC service implemented by our broker, stemming from the gRPC service definition we've previously discussed.port, listener, grpcServer
: These elements control the network configuration for the gRPC server.subscribers
: A map, where topic names are the keys and another map (linking subscriber IDs to their respective streams) are the values. This structure allows the broker to manage multiple subscribers (each representing one consumer) for multiple topics.topicSubscriberStreamMutexes
: This map, keyed bystreamKey
and pointing tosync.Mutex
instances is responsible for synchronizing access to each subscriber stream. This synchronization is critical to prevent race conditions during message transmission. Given that multiple publishers might concurrently publish on the same topic, and considering gRPC's restriction against concurrent writes to a stream, safeguarding each stream with its own mutex becomes essential.mu
: This mutex synchronizes access to both thetopicSubscriberStreamMutexes
and thesubscribers
maps. By opting for a read-write mutex, we can handle simultaneous publishing requests efficiently. Given that subscription modifications (which require write locks) are expected to occur less frequently than publishing requests (which only need read access), using a read-write lock allows us to improve the throughput of our system.
We won’t be going over the server initialization and startup methods, since they are trivial and straight-forward. In case you’d like a closer look, you can refer to the complete code linked above.
Handling Subscriptions
Consumer flow
Check Existing Subscription: The method starts by checking if the consumer is already subscribed to the specified
topic
. If the consumer is already subscribed the method returns immediately, avoiding duplicate subscriptions.If the consumer is not already subscribed, a new cancellable context
streamCtx
is created. This context is used to manage the stream’s lifecycle and allows cancellation of the stream when unsubscribing.Initiating Subscription: The consumer then initiates a subscription to the topic by making an RPC call to the
Subscribe
method on the Broker. The broker initiates a server-side stream and returns it to the consumer.If the subscription is successful, the cancel function for the stream's context is stored in
c.subscriptions
which is a concurrent map. This allows the consumer to manage the stream and cancel it when necessary.Finally, a new goroutine is started to receive any messages sent on the topic associated with this subscription.
The receive
goroutine is responsible for receiving messages on its respective topic. It pushes all the messages it receives into the consumer’s Messages channel. We use a common channel to receive messages from all the topics for the sake of simplicity.
Broker flow
The Subscribe
method in the broker implements the Subscribe RPC that we defined earlier. The stream parameter in the function represents the server-side gRPC stream that is used to send messages to the subscriber.
We first acquire a lock on the shared subscriber and stream-mutex maps to make them available for mutation.
If there are no existing subscribers for the requested topic (
b.subscribers[key.topic]
), a new map is created to hold subscriber streams for that topic.The subscriber's stream is stored in the map (
b.subscribers[key.topic][key.subscriberId] = stream
). This associates the subscriber with the topic stream.A new mutex is created for this stream and stored in the
b.topicSubscriberStreamMutexes
map. This mutex is used to synchronize access to this specific subscriber stream.The broker releases the lock on these shared maps once the insertions are done to allow other operations to proceed.
The function finally enters a stream-management loop, continually checking two conditions using a
select
statement:Client Stream Closure: If the client closes the stream (
<-stream.Context().Done()
), the function returns. This implies that the client has unsubscribed or disconnected.Broker Shutdown: If the broker’s context is canceled (
<-b.ctx.Done()
), indicating a broker shutdown, the function also returns.
It’s important to enter the infinite stream-management loop to ensure that the stream’s connection is not closed. The server-side connection closes when this RPC method returns.
Handling Message Publishing
Publisher flow
The client flow is straightforward. It simply builds a PublishRequest
object and calls the Publish
RPC on the broker using the gRPC client.
Broker flow
The method starts by acquiring a read lock, allowing concurrent read access by multiple
Publish
calls but preventing writes, ensuring thread-safe access to the broker's metadata.A slice
brokenSubscribers
is initialized to track any subscribers that fail to receive the message.The method iterates over all subscribers of the given topic, for each subscriber, it:
Creates a
streamKey
unique to the subscriber and topic.Acquires a lock specific to that subscriber's stream. This prevents other concurrent operations from writing to this stream while this thread is writing to it.
Sends the message on this stream.
After attempting to send the message, the lock on this stream is released. If an error occurs during sending, the subscriber is added to the
brokenSubscribers
list for later removal.
We can handle this more gracefully by implementing fine-grained handling for each error scenario (since all errors may not represent a broken stream), however, we omit that for the sake of simplicity.
The read lock on the metadata is released.
b.removeBrokenSubscribers(brokenSubscribers)
is called to clean up any subscribers who couldn't receive the message.
Our current implementation has a limitation in its handling of publishing requests. When a message is published to a topic, it is not dispatched to all subscribers concurrently. Instead, we iterate over the topic's subscribers one by one. This approach may introduce a performance bottleneck, especially for topics with a significantly large number of subscriptions.
I would encourage the reader to explore an alternative approach by implementing a worker pool. This would enable sending messages to multiple subscribers of a topic concurrently. However, it's interesting to note that the anticipated improvement in latency might not be as significant as expected in most scenarios. This is because the overhead associated with inter-thread message passing could potentially offset the benefits of parallel processing when compared to our current method, which utilizes a sequential but minimalistic for-loop.
Handling Unsubscribe
Consumer flow
Check Subscription Status: We begin by checking if the consumer is currently subscribed to the specified topic. If the consumer is not subscribed an error is returned, indicating that the consumer cannot unsubscribe from a topic to which it is not subscribed.
Cancel the Subscription’s Stream Context: If the consumer is subscribed, its subscription stream context is canceled. This is done by calling the cancel function associated with the context of this subscription’s stream. This cancel function was created when the consumer subscribed to the topic.
Calling this function cancels the context, which in turn should signal the goroutine runningreceive
to stop receiving messages and terminate.The consumer then removes the subscription entry for the topic from its local
subscriptions
map.Finally, the consumer sends an unsubscribe request to the broker. This is done by calling the
Unsubscribe
RPC on the Broker. The request includes the topic and the consumer’s ID so that the broker can identify and process the subscription.
Broker flow
The broker begins by acquiring a write lock to ensure exclusive access to the subscriber metadata.
A
streamKey
is constructed using the topic and subscriber ID from the unsubscription request. This key uniquely identifies the subscriber's stream for the given topic.The broker checks if the specified topic exists in the
subscribers
map. If the topic does not exist, it returns an error, indicating that the unsubscription was unsuccessful because the topic is unknown.Next, the broker checks if the subscriber ID exists under the specified topic. If the subscriber ID is not found, it similarly returns an error, indicating the subscriber was not found for the given topic.
If both the topic and subscriber ID exist, the broker proceeds to remove the subscriber from the
subscribers
map for that topic.The broker also deletes the mutex associated with the subscriber's stream.
Conclusion
This concludes our in-depth discussion on implementing a concurrent Publish/Subscribe Message Broker in Go, in under 500 lines of code.
Throughout this exploration, we delved into both the architectural design and the intricate low-level implementation details required to build this system from the ground up. I hope that this issue has not only deepened your understanding of how Pub/Sub systems function but also enriched your knowledge of gRPC and concurrency in Go.
If you’re interested in diving deeper into the code, or exploring how this code was tested - head over to the GitHub repo!
References
https://redis.io/docs/interact/pubsub/
https://redis.com/glossary/pub-sub/
https://raphaeldelio.medium.com/understanding-pub-sub-in-redis-18278440c2a9