geekfarm
← Back to Blog

Introducing Keyop Messenger

Contents

What is Keyop Messenger?

keyop-messenger is a messaging library for Go. I created it to be the nervous system for 'keyop', the latest generation of a project I've been working on since 1999.

Why create another messaging library in Go?

The previous generation of keyop used Kafka for node-to-node communication, but running Kafka in the cloud for a my personal project was expensive, and I had to use a different solution for intra-node communication that could tolerate being offline.

Ideally, I wanted a messaging system with these properties:

Here are some things that keyop-messenger does NOT try to solve:

Go has lots of great messaging libraries, but nothing quite lined up with my requirements.

Features

keyop-messenger is a pub-sub messaging library.

It's small (<10k lines of code), very lightweight, and will run very comfortably on an old raspberry pi.

Similar to Kafka, it provides named channels that services can use to communicate. Processes can easily publish messages to one or more channels, and/or subscribe to one or more channels to receive messages that are being published.

When a message gets published to a channel, it gets written to a file on disk. If the publisher gets back a successful response from Publish(), that means the message was successfully committed to disk.

Subscribers of a channel follow the channel log file for new messages. Subscribers register a callback function method which gets called to process each message. If the callback returns with no error, the message is considered successfully processed, and the subscriber offset in the channel file is updated. If the application crashes or gets restarted after a message was successfully published, and before subscribers can finish processing it, then the message will still be waiting in the channel file, and will get picked up by subscribers on restart. There's a chance a message might be picked up twice in edge cases like this, but assuming the disk is reliable, there is a guarantee that the message will get delivered at least once to all subscribers. If the callback returns an error, it will get retried a few times with exponential back-off. If it continues to fail it will eventually get moved to the dead-letter queue.

The actual channel logs are stored in a directory of fixed-size append-only segment files. Writes to these channel files are atomic. The messaging library tracks the offsets of each client so that clients can resume where they left off. Old segment files get cleaned up once all clients have consumed all the messages in the file, or after some configurable maximum amount of time has passed.

Messaging is also federated, meaning that keyop-messenger running on a client host can subscribe or publish select channels to a hub. A given instance can easily be a client or a hub or both depending on the configuration. Messaging between hosts is secured by self-signed mTLS certificates. keyop-messenger comes with a command line utility to generate the certs. The hub configuration controls which clients can connect, and which channels they can publish and subscribe.

Federated messaging is low latency. Imagine a client publishes a message locally, and that message gets delivered to a hub on a different host, and then another client on a 3rd host picks up that message from the hub. If you are actively watching the channel logs on all three hosts at the same time, it will typically arrive so quickly in all three logs that you'll have a problem distinguishing the source from the destination.

Debugging messaging problems is easy, because you can just look at the channel log files (.jsonl format) to see whether or not messages are arriving, and you can watch the offsets to see if the messages are being picked up. If you don't see the messages you expect, you can check the audit log for more details on connections and inbound messages.

Getting Started

Here's an example of how to use the library.

Certificates

keyop-messenger uses "mutual authentication", or mTLS, meaning that the client authenticates the server (hub), and the server authenticates the client. Authentication involves checking that the peer's certificate was signed by the correct certificate authority (CA). Each instance has its own certificate which uniquely identifies that instance. Having each instance identified by a unique certificate allows the server to limit which channels that client can subscribe and publish.

So, we need to start by generating some self-signed certificates. keyop-messenger has a 'keygen' command for generating the necessary certs.

Before generating host certs, we need to create a CA to sign the certs. This command will generate the ca.crt and ca.key in the current working directory.

keyop-messenger keygen ca

Now we need to create certs for the clients. This example uses two messenger instances, but they are both running on the same hosts, so we'll use 'localhost' for the hostname. Again these are generated to the current working directory.

# host 1
keyop-messenger keygen instance \
    --ca ca.crt --ca-key ca.key \
    --name localhost \
    --out-cert host1.crt \
    --out-key  host1.key

# host 2
keyop-messenger keygen instance \
    --ca ca.crt --ca-key ca.key \
    --name localhost \
    --out-cert host2.crt \
    --out-key  host2.key

Now that we have the certs, we need to copy them to host1 and host2, along with the 'ca.crt' which will be used to validate that the certs have been signed by the same CA. Do not copy ca.key to the hosts or someone could use it to generate more certs. Keep ca.key safe and don't lose track of it in case you want to create more instance certs in the future. It's best that every instance has its own cert so you can control which channels that specific instance can publish and subscribe.

On the instance hosts, you can keep the cert files anywhere you want--you just need to adjust the configuration for where you plan to keep them.

Payload Registration

In order to publish data, you need to start by creating structs for the data you want to send. The example below uses an 'Alert'.

type Alert struct {
        Message string `json:"message"`
}

Before sending or receiving the message, it is necessary to register the payload with the messenger. Here's how to register a payload.

err := m.RegisterPayloadType("com.example.alert.v1", Alert{})

It is typically a good practice to version your payloads if you expect that they might change over time, and if you may ever have multiple versions of the payload in flight. Then in the code that handles the payloads, you can check the payload version and handle it appropriately. For small projects, this might be overkill.

host1

host1 will be acting as the 'hub', meaning that it listens on a port (7740) and waits for clients to connect. In this example, there are no pub or sub channels defined for the AllowedPeers, so clients are allowed to send and receive any channels they want.

package main

import (
	"context"
	"log/slog"
	"path"
	"time"

	messenger "github.com/wu/keyop-messenger"
)

func host1(ctx context.Context, logger *slog.Logger, baseDir string) {

	tmpDir := path.Join(baseDir, "host1")
	logger.Info("host1: starting", "dataDir", tmpDir)

	cfg := &messenger.Config{
		Name: "host1",
		Storage: messenger.StorageConfig{
			DataDir: tmpDir,
		},
		Hub: messenger.HubConfig{
			Enabled:    true,
			ListenAddr: ":7740",
			AllowedPeers: []messenger.AllowedPeer{
				{
					Name: "localhost",
				},
			},
		},
		TLS: messenger.TLSConfig{
			Cert: path.Join(tmpDir, "cert", "host1.crt"),
			Key:  path.Join(tmpDir, "cert", "host1.key"),
			CA:   path.Join(tmpDir, "cert", "ca.crt"),
		},
	}
	cfg.ApplyDefaults()

	m, err := messenger.New(cfg, messenger.WithLogger(logger))
	if err != nil {
		panic(err)
	}
	defer func() {
		if err := m.Close(); err != nil {
			logger.Error("failed to close messenger", "error", err)
		}
	}()

	// Register payload types for typed decoding.
	if err := m.RegisterPayloadType("com.example.Alert", Alert{}); err != nil {
		panic(err)
	}

	for {
		select {
		case <-ctx.Done():
			logger.Info("host1: shutting down")
			return
		default:
			// Publish with service identification. Blocks until write is confirmed to disk.
			pubCtx := messenger.WithServiceName(ctx, "monitor-service")
			logger.Info("host1: publishing message", "service", "monitor-service")
			if err := m.Publish(pubCtx, "alerts", "com.example.Alert", Alert{Message: "system problem!"}); err != nil {
				slog.Error("failed to publish message", "error", err)
			}
			time.Sleep(time.Second)
		}
	}
}

host2

host2 is acting as the 'client', meaning that it connects to the host1 hub on port (7740). It could subscribe or publish any number of channels, but in this case it subscribes to the 'alerts' channel on host1, so any messages that get sent to the 'alerts' channel on host1 will get forwarded to the 'alerts' channel on host2.

host2 also registers a payload type for the Alerts struct (since this is a different messenger instance communicating with the messenger instance for host1), and then subscribes to the 'alerts' channel and waits for alerts to arrive. When an alert arrives, it just logs the message.

package main

import (
	"context"
	"log/slog"
	"path"

	messenger "github.com/wu/keyop-messenger"
)

func host2(ctx context.Context, logger *slog.Logger, baseDir string) {

	tmpDir := path.Join(baseDir, "host2")
	logger.Info("host2: starting", "dataDir", tmpDir)

	cfg := &messenger.Config{
		Name: "host2",
		Storage: messenger.StorageConfig{
			DataDir: tmpDir,
		},
		Client: messenger.ClientConfig{
			Enabled: true,
			Hubs: []messenger.ClientHubRef{
				{
					Addr: "localhost:7740",
					Subscribe: []string{
						"alerts",
					},
				},
			},
		},
		TLS: messenger.TLSConfig{
			Cert: path.Join(tmpDir, "cert", "host2.crt"),
			Key:  path.Join(tmpDir, "cert", "host2.key"),
			CA:   path.Join(tmpDir, "cert", "ca.crt"),
		},
	}
	cfg.ApplyDefaults()

	logger.Info("host2: creating messenger instance", "config", cfg)
	m, err := messenger.New(cfg, messenger.WithLogger(logger))
	if err != nil {
		logger.Error("failed to create messenger", "error", err)
		panic(err)
	}
	logger.Info("host2: messenger created successfully")
	defer func() {
		if err := m.Close(); err != nil {
			logger.Error("failed to close messenger", "error", err)
		}
	}()

	// Register payload types for typed decoding.
	if err := m.RegisterPayloadType("com.example.Alert", Alert{}); err != nil {
		logger.Error("failed to register payload type", "error", err)
		panic(err)
	}

	// Subscribe before publishing so the handler sees the message.
	logger.Info("host2: subscribing to alerts topic on worker-1")
	if err := m.Subscribe(ctx, "alerts", "worker-1", func(_ context.Context, msg messenger.Message) error {
		a, ok := msg.Payload.(Alert)
		if !ok {
			logger.Error("failed to cast payload to Alert")
			return nil
		}
		logger.Info("host2: received",
			"message", a.Message,
			"origin", msg.Origin,
			"service", msg.ServiceName,
		)
		return nil
	}); err != nil {
		logger.Error("host2: failed to subscribe", "error", err)
		panic(err)
	}
	logger.Info("host2: subscribed successfully")

	logger.Info("host2: listening for messages from host1")
	<-ctx.Done()
	logger.Info("host2: shutting down")
}

Running the Example

You can access the code in the example subdirectory of keyop-messenger. It contains an easy-to-use script to automate generating the necessary certs and starting the instances locally:

Start it up using the 'run.sh' script. Once it's running, you can shut it down by hitting ctrl+c. Here's an example of the output:

--(wu@navi:#)-- ./run.sh                                                                                                                                                                                                             -[main]-
Generating CA keys
Subject:     CN=keyop-ca
Valid from:  2026-04-24T22:44:48Z
Valid until: 2028-04-23T22:44:48Z
Fingerprint: SHA-256:5cba1feb1897e807487c1a4928c381508f120d248da078a7b29489514b5b45ac
Generating host1 keys
Subject:     CN=localhost
Valid from:  2026-04-24T22:44:48Z
Valid until: 2028-04-23T22:44:48Z
Fingerprint: SHA-256:2ba046337b3cf349ff74cfb7f6e9f9ebf5f61505d1c6088d156fe61deae3a8ef
Generating host2 keys
Subject:     CN=localhost
Valid from:  2026-04-24T22:44:48Z
Valid until: 2028-04-23T22:44:48Z
Fingerprint: SHA-256:b0223424f019815108e154012bff0932f486adab9ded98f906e88a89fa46aede
Building example
Running example
2026/04/24 15:44:48 INFO host2: starting dataDir=/tmp/keyop-messenger-example/host2
2026/04/24 15:44:48 INFO host1: starting dataDir=/tmp/keyop-messenger-example/host1
2026/04/24 15:44:48 INFO host2: creating messenger instance config="&{Name:host2 Storage:{DataDir:/tmp/keyop-messenger-example/host2 SyncPolicy:periodic SyncIntervalMS:200 MaxSubscriberLagMB:512 CompactionThresholdMB:256 OffsetFlushIntervalMS:0} Subscribers:{MaxRetries:0x7365c2822060} Hub:{Enabled:false ListenAddr: AllowedPeers:[] FedClientOffsetTTL:168h0m0s} Client:{Enabled:true Hubs:[{Addr:localhost:7740 Subscribe:[alerts] Publish:[]}]} TLS:{Cert:/tmp/keyop-messenger-example/host2/cert/host2.crt Key:/tmp/keyop-messenger-example/host2/cert/host2.key CA:/tmp/keyop-messenger-example/host2/cert/ca.crt MinVersion:1.3 ExpiryWarnDays:30} Federation:{ReconnectBaseMS:500 ReconnectMaxMS:60000 ReconnectJitter:0.2 SendBufferMessages:10000 MaxBatchBytes:4194304} Dedup:{SeenIDLRUSize:100000} Audit:{MaxSizeMB:100 MaxFiles:10}}"
2026/04/24 15:44:48 INFO host1: publishing message service=monitor-service
2026/04/24 15:44:48 INFO host2: messenger created successfully
2026/04/24 15:44:48 INFO host2: subscribing to alerts topic on worker-1
2026/04/24 15:44:48 INFO federation: hub accepted connection peer=localhost addr=127.0.0.1:56479
2026/04/24 15:44:48 INFO host2: subscribed successfully
2026/04/24 15:44:48 INFO host2: listening for messages from host1
2026/04/24 15:44:49 INFO host1: publishing message service=monitor-service
2026/04/24 15:44:49 INFO host2: received message="system problem!" origin=host1 service=monitor-service
2026/04/24 15:44:50 INFO host1: publishing message service=monitor-service
2026/04/24 15:44:50 INFO host2: received message="system problem!" origin=host1 service=monitor-service
2026/04/24 15:44:51 INFO host1: publishing message service=monitor-service
2026/04/24 15:44:51 INFO host2: received message="system problem!" origin=host1 service=monitor-service
2026/04/24 15:44:52 INFO host1: publishing message service=monitor-service
2026/04/24 15:44:52 INFO host2: received message="system problem!" origin=host1 service=monitor-service

... I hit ctrl+c here ...

^C2026/04/24 15:44:52 INFO host2: shutting down
2026/04/24 15:44:52 ERROR federation: receiver read error peer=localhost:7740 err="read tcp 127.0.0.1:56479->127.0.0.1:7740: use of closed network connection"
2026/04/24 15:44:52 ERROR federation: receiver read error peer=localhost err="websocket: close 1006 (abnormal closure): unexpected EOF"
2026/04/24 15:44:53 INFO host1: shutting down
2026/04/24 15:44:53 WARN removing temp directory path=/tmp/keyop-messenger-example