Introducing Keyop Messenger
Contents
- What is Keyop Messenger?
- Why create another messaging library in Go?
- Features
- Getting Started
- Running the Example
What is Keyop Messenger?
keyop-messenger is a messaging library for Go. I created it to be the nervous system for 'keyop', the latest generation of a project I've been working on since 1999.
Why create another messaging library in Go?
The previous generation of keyop used Kafka for node-to-node communication, but running Kafka in the cloud for a my personal project was expensive, and I had to use a different solution for intra-node communication that could tolerate being offline.
Ideally, I wanted a messaging system with these properties:
- offline pub and sub - channels for local event-based activity that would continue working when the instance is temporarily offline
- reliable at-least-once delivery - if a message gets published successfully, unless the disk gets corrupted, it shouldn't get lost
- federated - clients publish and subscribe to specific channels on hubs, hubs can control which clients access which channels, must be easy to configure
- secure - mTLS, each instance has its own cert, with an easy process to generate self-signed certs
- fast - when a client sends a message, I should typically see it on my dashboards almost instantly, even if it goes through multiple hops
- small and efficient - should run with minimal resources on tiny systems
- simple - I'm not looking for another complicated system to maintain
- observable - Troubleshooting should be easy, e.g. grep through the message history and debug complicated multi-event processes with correlation ids
- native go - no CGO dependencies
Here are some things that keyop-messenger does NOT try to solve:
- extremely high throughput - A few thousand messages per second is more than enough for my use cases, and keyop-messenger should greatly exceed that.
- exactly-once delivery - A message may be delivered to a subscriber more than once. The subscriber needs to handle idempotency.
- client-specified offsets - Clients can't specify where they want to start in the log. On the initial subscription, they start with the first new event that arrives in the local channel after they subscribed. If they previously subscribed and then reconnect, they pick up where they left off. There is also an ephemeral client that can be used when you don't care about history and only want to see new messages as they arrive.
- consumer groups - Each client gets their own offset. Multiple workers can't reliably share the same offset.
- (federation) time sync - The originating system's clock is used to populate the date and time on the message field. If system clocks are not in sync, events that happen at the same time on different systems would have different timestamps.
- (federation) message ordering across hosts - Messages are stored in the order they arrive on a given hub, not by the time they were created or initially sent. Clients could see messages from one host before messages from another that were actually sent earlier.
- (federation) message ordering across hubs - If clients publish channels to multiple hubs, then a client that pulls a channel from multiple hubs does not have any guarantees about the order in which the messages are processed, even for messages that were generated by the same host. Duplicates will generally be prevented by the in-memory LRU cache of recently-seen message IDs.
Go has lots of great messaging libraries, but nothing quite lined up with my requirements.
Features
keyop-messenger is a pub-sub messaging library.
It's small (<10k lines of code), very lightweight, and will run very comfortably on an old raspberry pi.
Similar to Kafka, it provides named channels that services can use to communicate. Processes can easily publish messages to one or more channels, and/or subscribe to one or more channels to receive messages that are being published.
When a message gets published to a channel, it gets written to a file on disk. If the publisher gets back a successful response from Publish(), that means the message was successfully committed to disk.
Subscribers of a channel follow the channel log file for new messages. Subscribers register a callback function method which gets called to process each message. If the callback returns with no error, the message is considered successfully processed, and the subscriber offset in the channel file is updated. If the application crashes or gets restarted after a message was successfully published, and before subscribers can finish processing it, then the message will still be waiting in the channel file, and will get picked up by subscribers on restart. There's a chance a message might be picked up twice in edge cases like this, but assuming the disk is reliable, there is a guarantee that the message will get delivered at least once to all subscribers. If the callback returns an error, it will get retried a few times with exponential back-off. If it continues to fail it will eventually get moved to the dead-letter queue.
The actual channel logs are stored in a directory of fixed-size append-only segment files. Writes to these channel files are atomic. The messaging library tracks the offsets of each client so that clients can resume where they left off. Old segment files get cleaned up once all clients have consumed all the messages in the file, or after some configurable maximum amount of time has passed.
Messaging is also federated, meaning that keyop-messenger running on a client host can subscribe or publish select channels to a hub. A given instance can easily be a client or a hub or both depending on the configuration. Messaging between hosts is secured by self-signed mTLS certificates. keyop-messenger comes with a command line utility to generate the certs. The hub configuration controls which clients can connect, and which channels they can publish and subscribe.
Federated messaging is low latency. Imagine a client publishes a message locally, and that message gets delivered to a hub on a different host, and then another client on a 3rd host picks up that message from the hub. If you are actively watching the channel logs on all three hosts at the same time, it will typically arrive so quickly in all three logs that you'll have a problem distinguishing the source from the destination.
Debugging messaging problems is easy, because you can just look at the channel log files (.jsonl format) to see whether or not messages are arriving, and you can watch the offsets to see if the messages are being picked up. If you don't see the messages you expect, you can check the audit log for more details on connections and inbound messages.
Getting Started
Here's an example of how to use the library.
Certificates
keyop-messenger uses "mutual authentication", or mTLS, meaning that the client authenticates the server (hub), and the server authenticates the client. Authentication involves checking that the peer's certificate was signed by the correct certificate authority (CA). Each instance has its own certificate which uniquely identifies that instance. Having each instance identified by a unique certificate allows the server to limit which channels that client can subscribe and publish.
So, we need to start by generating some self-signed certificates. keyop-messenger has a 'keygen' command for generating the necessary certs.
Before generating host certs, we need to create a CA to sign the certs. This command will generate the ca.crt and ca.key in the current working directory.
keyop-messenger keygen ca
Now we need to create certs for the clients. This example uses two messenger instances, but they are both running on the same hosts, so we'll use 'localhost' for the hostname. Again these are generated to the current working directory.
# host 1
keyop-messenger keygen instance \
--ca ca.crt --ca-key ca.key \
--name localhost \
--out-cert host1.crt \
--out-key host1.key
# host 2
keyop-messenger keygen instance \
--ca ca.crt --ca-key ca.key \
--name localhost \
--out-cert host2.crt \
--out-key host2.key
Now that we have the certs, we need to copy them to host1 and host2, along with the 'ca.crt' which will be used to validate that the certs have been signed by the same CA. Do not copy ca.key to the hosts or someone could use it to generate more certs. Keep ca.key safe and don't lose track of it in case you want to create more instance certs in the future. It's best that every instance has its own cert so you can control which channels that specific instance can publish and subscribe.
On the instance hosts, you can keep the cert files anywhere you want--you just need to adjust the configuration for where you plan to keep them.
Payload Registration
In order to publish data, you need to start by creating structs for the data you want to send. The example below uses an 'Alert'.
type Alert struct {
Message string `json:"message"`
}
Before sending or receiving the message, it is necessary to register the payload with the messenger. Here's how to register a payload.
err := m.RegisterPayloadType("com.example.alert.v1", Alert{})
It is typically a good practice to version your payloads if you expect that they might change over time, and if you may ever have multiple versions of the payload in flight. Then in the code that handles the payloads, you can check the payload version and handle it appropriately. For small projects, this might be overkill.
host1
host1 will be acting as the 'hub', meaning that it listens on a port (7740) and waits for clients to connect. In this example, there are no pub or sub channels defined for the AllowedPeers, so clients are allowed to send and receive any channels they want.
package main
import (
"context"
"log/slog"
"path"
"time"
messenger "github.com/wu/keyop-messenger"
)
func host1(ctx context.Context, logger *slog.Logger, baseDir string) {
tmpDir := path.Join(baseDir, "host1")
logger.Info("host1: starting", "dataDir", tmpDir)
cfg := &messenger.Config{
Name: "host1",
Storage: messenger.StorageConfig{
DataDir: tmpDir,
},
Hub: messenger.HubConfig{
Enabled: true,
ListenAddr: ":7740",
AllowedPeers: []messenger.AllowedPeer{
{
Name: "localhost",
},
},
},
TLS: messenger.TLSConfig{
Cert: path.Join(tmpDir, "cert", "host1.crt"),
Key: path.Join(tmpDir, "cert", "host1.key"),
CA: path.Join(tmpDir, "cert", "ca.crt"),
},
}
cfg.ApplyDefaults()
m, err := messenger.New(cfg, messenger.WithLogger(logger))
if err != nil {
panic(err)
}
defer func() {
if err := m.Close(); err != nil {
logger.Error("failed to close messenger", "error", err)
}
}()
// Register payload types for typed decoding.
if err := m.RegisterPayloadType("com.example.Alert", Alert{}); err != nil {
panic(err)
}
for {
select {
case <-ctx.Done():
logger.Info("host1: shutting down")
return
default:
// Publish with service identification. Blocks until write is confirmed to disk.
pubCtx := messenger.WithServiceName(ctx, "monitor-service")
logger.Info("host1: publishing message", "service", "monitor-service")
if err := m.Publish(pubCtx, "alerts", "com.example.Alert", Alert{Message: "system problem!"}); err != nil {
slog.Error("failed to publish message", "error", err)
}
time.Sleep(time.Second)
}
}
}
host2
host2 is acting as the 'client', meaning that it connects to the host1 hub on port (7740). It could subscribe or publish any number of channels, but in this case it subscribes to the 'alerts' channel on host1, so any messages that get sent to the 'alerts' channel on host1 will get forwarded to the 'alerts' channel on host2.
host2 also registers a payload type for the Alerts struct (since this is a different messenger instance communicating with the messenger instance for host1), and then subscribes to the 'alerts' channel and waits for alerts to arrive. When an alert arrives, it just logs the message.
package main
import (
"context"
"log/slog"
"path"
messenger "github.com/wu/keyop-messenger"
)
func host2(ctx context.Context, logger *slog.Logger, baseDir string) {
tmpDir := path.Join(baseDir, "host2")
logger.Info("host2: starting", "dataDir", tmpDir)
cfg := &messenger.Config{
Name: "host2",
Storage: messenger.StorageConfig{
DataDir: tmpDir,
},
Client: messenger.ClientConfig{
Enabled: true,
Hubs: []messenger.ClientHubRef{
{
Addr: "localhost:7740",
Subscribe: []string{
"alerts",
},
},
},
},
TLS: messenger.TLSConfig{
Cert: path.Join(tmpDir, "cert", "host2.crt"),
Key: path.Join(tmpDir, "cert", "host2.key"),
CA: path.Join(tmpDir, "cert", "ca.crt"),
},
}
cfg.ApplyDefaults()
logger.Info("host2: creating messenger instance", "config", cfg)
m, err := messenger.New(cfg, messenger.WithLogger(logger))
if err != nil {
logger.Error("failed to create messenger", "error", err)
panic(err)
}
logger.Info("host2: messenger created successfully")
defer func() {
if err := m.Close(); err != nil {
logger.Error("failed to close messenger", "error", err)
}
}()
// Register payload types for typed decoding.
if err := m.RegisterPayloadType("com.example.Alert", Alert{}); err != nil {
logger.Error("failed to register payload type", "error", err)
panic(err)
}
// Subscribe before publishing so the handler sees the message.
logger.Info("host2: subscribing to alerts topic on worker-1")
if err := m.Subscribe(ctx, "alerts", "worker-1", func(_ context.Context, msg messenger.Message) error {
a, ok := msg.Payload.(Alert)
if !ok {
logger.Error("failed to cast payload to Alert")
return nil
}
logger.Info("host2: received",
"message", a.Message,
"origin", msg.Origin,
"service", msg.ServiceName,
)
return nil
}); err != nil {
logger.Error("host2: failed to subscribe", "error", err)
panic(err)
}
logger.Info("host2: subscribed successfully")
logger.Info("host2: listening for messages from host1")
<-ctx.Done()
logger.Info("host2: shutting down")
}
Running the Example
You can access the code in the example subdirectory of keyop-messenger. It contains an easy-to-use script to automate generating the necessary certs and starting the instances locally:
Start it up using the 'run.sh' script. Once it's running, you can shut it down by hitting ctrl+c. Here's an example of the output:
--(wu@navi:#)-- ./run.sh -[main]-
Generating CA keys
Subject: CN=keyop-ca
Valid from: 2026-04-24T22:44:48Z
Valid until: 2028-04-23T22:44:48Z
Fingerprint: SHA-256:5cba1feb1897e807487c1a4928c381508f120d248da078a7b29489514b5b45ac
Generating host1 keys
Subject: CN=localhost
Valid from: 2026-04-24T22:44:48Z
Valid until: 2028-04-23T22:44:48Z
Fingerprint: SHA-256:2ba046337b3cf349ff74cfb7f6e9f9ebf5f61505d1c6088d156fe61deae3a8ef
Generating host2 keys
Subject: CN=localhost
Valid from: 2026-04-24T22:44:48Z
Valid until: 2028-04-23T22:44:48Z
Fingerprint: SHA-256:b0223424f019815108e154012bff0932f486adab9ded98f906e88a89fa46aede
Building example
Running example
2026/04/24 15:44:48 INFO host2: starting dataDir=/tmp/keyop-messenger-example/host2
2026/04/24 15:44:48 INFO host1: starting dataDir=/tmp/keyop-messenger-example/host1
2026/04/24 15:44:48 INFO host2: creating messenger instance config="&{Name:host2 Storage:{DataDir:/tmp/keyop-messenger-example/host2 SyncPolicy:periodic SyncIntervalMS:200 MaxSubscriberLagMB:512 CompactionThresholdMB:256 OffsetFlushIntervalMS:0} Subscribers:{MaxRetries:0x7365c2822060} Hub:{Enabled:false ListenAddr: AllowedPeers:[] FedClientOffsetTTL:168h0m0s} Client:{Enabled:true Hubs:[{Addr:localhost:7740 Subscribe:[alerts] Publish:[]}]} TLS:{Cert:/tmp/keyop-messenger-example/host2/cert/host2.crt Key:/tmp/keyop-messenger-example/host2/cert/host2.key CA:/tmp/keyop-messenger-example/host2/cert/ca.crt MinVersion:1.3 ExpiryWarnDays:30} Federation:{ReconnectBaseMS:500 ReconnectMaxMS:60000 ReconnectJitter:0.2 SendBufferMessages:10000 MaxBatchBytes:4194304} Dedup:{SeenIDLRUSize:100000} Audit:{MaxSizeMB:100 MaxFiles:10}}"
2026/04/24 15:44:48 INFO host1: publishing message service=monitor-service
2026/04/24 15:44:48 INFO host2: messenger created successfully
2026/04/24 15:44:48 INFO host2: subscribing to alerts topic on worker-1
2026/04/24 15:44:48 INFO federation: hub accepted connection peer=localhost addr=127.0.0.1:56479
2026/04/24 15:44:48 INFO host2: subscribed successfully
2026/04/24 15:44:48 INFO host2: listening for messages from host1
2026/04/24 15:44:49 INFO host1: publishing message service=monitor-service
2026/04/24 15:44:49 INFO host2: received message="system problem!" origin=host1 service=monitor-service
2026/04/24 15:44:50 INFO host1: publishing message service=monitor-service
2026/04/24 15:44:50 INFO host2: received message="system problem!" origin=host1 service=monitor-service
2026/04/24 15:44:51 INFO host1: publishing message service=monitor-service
2026/04/24 15:44:51 INFO host2: received message="system problem!" origin=host1 service=monitor-service
2026/04/24 15:44:52 INFO host1: publishing message service=monitor-service
2026/04/24 15:44:52 INFO host2: received message="system problem!" origin=host1 service=monitor-service
... I hit ctrl+c here ...
^C2026/04/24 15:44:52 INFO host2: shutting down
2026/04/24 15:44:52 ERROR federation: receiver read error peer=localhost:7740 err="read tcp 127.0.0.1:56479->127.0.0.1:7740: use of closed network connection"
2026/04/24 15:44:52 ERROR federation: receiver read error peer=localhost err="websocket: close 1006 (abnormal closure): unexpected EOF"
2026/04/24 15:44:53 INFO host1: shutting down
2026/04/24 15:44:53 WARN removing temp directory path=/tmp/keyop-messenger-example