geekfarm
← Back to Blog

Introducing Keyop Messenger

keyop-messenger is a fast, lightweight, federated pub-sub messaging system with at-least-once delivery guarantees, offline resilience, and mTLS security. It was designed to run efficiently on resource-constrained systems with minimal operational complexity.

I created it to be the nervous system for 'keyop', the latest generation of a distributed application I've been working on since 1999.

This post was not created or edited by AI.

Contents

Why create another messaging library in Go?

The previous generation of keyop used Kafka for node-to-node communication, but running Kafka in the cloud for a my personal project was expensive, and I had to use a different solution for intra-node communication that could tolerate being offline.

Ideally, I wanted a messaging system with these properties:

Here are some things that keyop-messenger does NOT try to solve:

Go has lots of great messaging libraries, but nothing quite lined up with my requirements.

Features

keyop-messenger is a pub-sub messaging library.

It's small (~5k lines of code and ~10k lines of tests) very lightweight, and will run very comfortably on an old raspberry pi.

Similar to Kafka, it provides named channels that services can use to communicate. Processes can easily publish and/or subscribe to one or more channels.

When a message gets published to a channel, it gets written to a file on disk. If the publisher gets back a successful response from Publish(), that means the message was successfully committed to disk.

Subscribers of a channel follow the channel log file for new messages. Subscribers register a callback function method which gets called to process each message. If the callback returns with no error, the message is considered successfully processed, and the subscriber offset in the channel file is updated. If the application crashes or gets restarted after a message was successfully published, and before subscribers can finish processing it, then the message will still be waiting in the channel file, and will get picked up by subscribers on restart. There's a chance a message might be picked up twice in edge cases like this, but assuming the disk is reliable, there is a guarantee that the message will get delivered at least once to all subscribers. If the callback returns an error, it will get retried a few times with exponential back-off. If it continues to fail it will eventually get moved to the dead-letter queue.

The actual channel logs are stored in a directory of fixed-size append-only segment files. Writes to these channel files are atomic. The messaging library tracks the offsets of each client so that clients can resume where they left off. Old segment files get cleaned up once all clients have consumed all the messages in the file, or after some configurable maximum amount of time has passed.

Messaging is also federated, meaning that keyop-messenger running on a client host can subscribe or publish select channels to a hub. A given instance can easily be a client or a hub or both depending on the configuration. Messaging between hosts is secured by self-signed mTLS certificates. keyop-messenger comes with a command line utility to generate the certs. The hub configuration controls which clients can connect, and which channels they can publish and subscribe.

Federated messaging is low latency. Imagine a client publishes a message locally, and that message gets delivered to a hub on a different host, and then another client on a 3rd host picks up that message from the hub. If you are actively watching the channel logs on all three hosts at the same time, it will typically arrive so quickly in all three logs that you'll have a problem distinguishing the source from the destination.

Debugging messaging problems is easy, because you can just look at the channel log files (.jsonl format) to see whether or not messages are arriving, and you can watch the offsets to see if the messages are being picked up. If you don't see the messages you expect, you can check the audit log for more details on connections and inbound messages.

Getting Started

Here's an example of how to use the library to communicate between two hosts.

The code for the example lives in the 'example' subdirectory of the project:

Certificates

keyop-messenger uses "mutual authentication", or mTLS, meaning that the client authenticates the server (hub), and the server authenticates the client. Authentication involves checking that the peer's certificate was signed by the correct certificate authority (CA). Each instance has its own certificate which uniquely identifies that instance. Having each instance identified by a unique certificate allows the server to limit which channels that client can subscribe and publish.

So, we need to start by generating some self-signed certificates. keyop-messenger has a 'keygen' command for generating the necessary certs.

Before generating host certs, we need to create a CA to sign the certs. This command will generate the ca.crt and ca.key in the current working directory.

keyop-messenger keygen ca

Now we need to create certs for the instances. This example uses two messenger instances, but they are both running on the same hosts, so we need to use 'localhost' for the hostname. Again these are generated to the current working directory.

# host 1
keyop-messenger keygen instance \
    --ca ca.crt --ca-key ca.key \
    --name localhost \
    --out-cert host1.crt \
    --out-key  host1.key

# host 2
keyop-messenger keygen instance \
    --ca ca.crt --ca-key ca.key \
    --name localhost \
    --out-cert host2.crt \
    --out-key  host2.key

Now that we have the certs, we need to copy them to host1 and host2, along with the 'ca.crt' which will be used to validate that the certs have been signed by the same CA. Do not copy ca.key to the hosts or someone could use it to generate more certs. Keep ca.key safe and don't lose track of it in case you want to create more instance certs in the future. It's best that every instance has its own cert so you can control which channels that specific instance can publish and subscribe.

On the instance hosts, you can keep the cert files anywhere you want--you just need to adjust the configuration for where you plan to keep them.

Payload Registration

keyop-messenger handles go structs serialization and deserialization in message payloads. You just need to register the payloads to publish and receive the data. To start, just create a struct for the message you want to send. Here's a simple example.

type Alert struct {
        Details string `json:"message"`
}

The just register the payload with the messenger before sending or receiving the message. You can register as many payloads as you need. Here's how to register a payload:

err := m.RegisterPayloadType("com.example.alert.v1", Alert{})

When publishing the message, declare the payload type:

alert := Alert{Details: "system problem!"}
err := m.Publish(ctx, "alerts", "com.example.alert.v1", alert)

And then when a subscriber receives the message, check the payload type:

if msg.PayloadType == "com.example.alert.v1" {
	// safely verify the payload type before using it
	a, ok := msg.Payload.(Alert)
	if !ok {
		logger.Error("failed to verify Alert payload type")
		return nil
	}
	// log the event details with event metadata from the message
	logger.Info("received alert!",
		"details", a.Details,
		"origin", msg.Origin,
		"timestamp", msg.Timestamp.Format("2006-01-02 15:04:05"),
	)
}

It is a good practice to include a version in your payloads if you expect that they might change over time, and if you may ever have multiple versions of the payload in flight. Then, in the code that handles the payloads, you can check the payload type and version and handle it appropriately. For small projects, this might be overkill.

Federation Round-Trip Happy Path

See also: Federation Topologies

The diagram below shows the happy path for a federation message with a hub on host 1 and a client on host 2. In this case, a publisher on client host 2 publishes a message to a local channel on host 2, and then host 2 forwards the message on that same channel to the hub on host 1, and then a subscriber on host 1 processes the message from the local channel on host 1.

keyop_messenger_federation_happy_path.png

Host 1 - Hub + Subscriber

host1 will be acting as the 'hub', meaning that it listens on a port (7740) and waits for clients to connect. It also subscribes to the local 'alerts' channel, and registers a callback to be invoked for every message that arrives on the channel.

In this example config, there are no pub or sub channels defined for the AllowedPeers, so clients are allowed to publish and subscribe to any channels they request.

package main

import (
	"context"
	"log/slog"
	"path"

	messenger "github.com/wu/keyop-messenger"
)

// host1 is acting as the 'hub'
func host1(ctx context.Context, logger *slog.Logger, baseDir string) {

	tmpDir := path.Join(baseDir, "host1")
	logger.Info("host1: starting", "dataDir", tmpDir)

	// listen on port (7740) and waits for clients to connect
	cfg := &messenger.Config{
		Name: "host1",
		Storage: messenger.StorageConfig{
			DataDir: tmpDir,
		},
		Hub: messenger.HubConfig{
			Enabled:    true,
			ListenAddr: ":7740",
			// no channels are specified for 'localhost', so it can publish and subscribe to any channels it specifies
			AllowedPeers: []messenger.AllowedPeer{
				{
					Name: "localhost",
				},
			},
		},
		TLS: messenger.TLSConfig{
			Cert: path.Join(tmpDir, "cert", "host1.crt"),
			Key:  path.Join(tmpDir, "cert", "host1.key"),
			CA:   path.Join(tmpDir, "cert", "ca.crt"),
		},
	}
	cfg.ApplyDefaults()

	// Create a new instance of the messenger for this host.
	// Normally this would only happen once per host, but this example is showing federation on a single host.
	m, err := messenger.New(cfg, messenger.WithLogger(logger))
	if err != nil {
		panic(err)
	}
	defer func() {
		if err := m.Close(); err != nil {
			logger.Error("failed to close messenger", "error", err)
		}
	}()

	// Register each of your payload types for typed decoding.
	if err := m.RegisterPayloadType("com.example.alert.v1", Alert{}); err != nil {
		panic(err)
	}

	// Subscribe to the local 'alerts' channel, and run the callback on each message that arrives there.
	// The subscriber ID is used when recording your local offset in the channel log.
	logger.Info("host1: subscribing to alerts topic on worker-1")
	err = m.Subscribe(ctx, "alerts", "worker-1", func(_ context.Context, msg messenger.Message) error {

		// check the payload type/version and respond accordingly
		if msg.PayloadType == "com.example.alert.v1" {
			// safely verify the payload type before using it
			a, ok := msg.Payload.(Alert)
			if !ok {
				logger.Error("failed to verify Alert payload type")
				return nil
			}

			// Just log the event details
			logger.Info("host1: received",
				"details", a.Details,
				"origin", msg.Origin,
				"service", msg.ServiceName,
				"timestamp", msg.Timestamp.Format("2006-01-02 15:04:05"),
				"count", a.Count,
			)
		}

		// Return nil to Ack the message, and allow the offset to be advanced.
		// Returning an error will cause the message to be retried after a backoff period.
		return nil
	})
	if err != nil {
		logger.Error("host1: failed to subscribe", "error", err)
		panic(err)
	}
	logger.Info("host1: subscribed successfully")

	logger.Info("host1: listening for messages from host1")
	<-ctx.Done()
	logger.Info("host1: shutting down")
}

I've built the config in code here so the paths can be built dynamically, but if you prefer to use a config file, it would look like this:

name: host1
storage:
  data_dir: /tmp/keyop-messenger-example/host1
hub:
  enabled: true
  listen_addr: ":7740"
  allowed_peers:
  - name: localhost
tls:
  cert: /tmp/keyop-messenger-example/host1/cert/host1.crt
  key: /tmp/keyop-messenger-example/host1/cert/host1.key
  ca: /tmp/keyop-messenger-example/host1/cert/ca.crt

And then to load the config file:

cfg, err := messenger.LoadConfig("host1.yaml")

Host 2 - Client + Publisher

host2 is acting as the 'client', meaning that it connects to the host1 hub on port (7740). In this case it only publishes a single channel to host1, the 'alerts' channel. So, any messages that get sent to the local 'alerts' channel on host2 will automatically get forwarded to the 'alerts' channel on host1.

host2 also registers a payload type for the Alerts struct (since this is a different messenger instance communicating with the messenger instance for host1), and then starts publishing a message ever second.

package main

import (
	"context"
	"log/slog"
	"path"
	"time"

	messenger "github.com/wu/keyop-messenger"
)

// host2 is acting as the client
func host2(ctx context.Context, logger *slog.Logger, baseDir string) {

	tmpDir := path.Join(baseDir, "host2")
	logger.Info("host2: starting", "dataDir", tmpDir)

	// connect to the hub on 'localhost' and subscribe to the 'alerts' channel.
	cfg := &messenger.Config{
		Name: "host2",
		Storage: messenger.StorageConfig{
			DataDir: tmpDir,
		},
		Client: messenger.ClientConfig{
			Enabled: true,
			Hubs: []messenger.ClientHubRef{
				{
					Addr: "localhost:7740",
					Publish: []string{
						"alerts",
					},
				},
			},
		},
		TLS: messenger.TLSConfig{
			Cert: path.Join(tmpDir, "cert", "host2.crt"),
			Key:  path.Join(tmpDir, "cert", "host2.key"),
			CA:   path.Join(tmpDir, "cert", "ca.crt"),
		},
	}
	cfg.ApplyDefaults()

	// Create a new instance of the messenger for this host.
	// Normally this would only happen once per host, but this example is showing federation on a single host.
	logger.Info("host2: creating messenger instance", "config", cfg)
	m, err := messenger.New(cfg, messenger.WithLogger(logger))
	if err != nil {
		logger.Error("failed to create messenger", "error", err)
		panic(err)
	}
	logger.Info("host2: messenger created successfully")
	defer func() {
		if err := m.Close(); err != nil {
			logger.Error("failed to close messenger", "error", err)
		}
	}()

	// Register payload types for typed decoding.
	if err := m.RegisterPayloadType("com.example.alert.v1", Alert{}); err != nil {
		logger.Error("failed to register payload type", "error", err)
		panic(err)
	}

	count := 0
	for {
		select {
		case <-ctx.Done():
			logger.Info("host2: shutting down")
			return
		default:
			// Publish to local channel with service identification.
			pubCtx := messenger.WithServiceName(ctx, "monitor-service")
			logger.Info("host2: publishing message", "service", "monitor-service")

			count++
			alert := Alert{Details: "system problem!", Count: count}
			// send the data using the registered payload type.   Blocks until write is confirmed to disk.
			if err := m.Publish(pubCtx, "alerts", "com.example.alert.v1", alert); err != nil {
				slog.Error("failed to publish message", "error", err)
			}
			time.Sleep(time.Second)
		}
	}
}

Again, I built the config from code, but here's the equivalent config:

name: host2
storage:
  data_dir: /tmp/keyop-messenger-example/host2
client:
  enabled: true
  hubs:
    - addr: "localhost:7740"
      publish:
      - "alerts"
tls:
  cert: /tmp/keyop-messenger-example/host2/cert/host2.crt
  key: /tmp/keyop-messenger-example/host2/cert/host2.key
  ca: /tmp/keyop-messenger-example/host2/cert/ca.crt

Running the Example

You can access the code in the example subdirectory of keyop-messenger. It contains an easy-to-use script to automate generating the necessary certs and starting the instances locally:

Start it up using the 'run.sh' script. Once it's running, you can shut it down by hitting ctrl+c. Here's an example of the output:

$ ./run.sh
Generating CA keys
Subject:     CN=keyop-ca
Valid from:  2026-04-29T00:43:12Z
Valid until: 2028-04-28T00:43:12Z
Fingerprint: SHA-256:68e783687758a47dd8a425ccb5b33243566a6f2b527464911985bc40620a6d72
Generating host1 keys
Subject:     CN=localhost
Valid from:  2026-04-29T00:43:12Z
Valid until: 2028-04-28T00:43:12Z
Fingerprint: SHA-256:8cd93a69e259d1ace0e0b170d94474d645beccb7ad61d493b58567a6f6593350
Generating host2 keys
Subject:     CN=localhost
Valid from:  2026-04-29T00:43:12Z
Valid until: 2028-04-28T00:43:12Z
Fingerprint: SHA-256:b88f856dedb3caf3e4132d1b4afb44d506f191757fd40476eb896327a830d6cb
Building example
2026/04/28 17:43:12 INFO host2: starting dataDir=/tmp/keyop-messenger-example/host2
2026/04/28 17:43:12 INFO host1: starting dataDir=/tmp/keyop-messenger-example/host1
2026/04/28 17:43:12 INFO host2: creating messenger instance config="&{Name:host2 Storage:{DataDir:/tmp/keyop-messenger-example/host2 SyncIntervalMS:0 MaxSubscriberLagMB:512 CompactionThresholdMB:256 OffsetFlushIntervalMS:0} Subscribers:{MaxRetries:0x71900521e060} Hub:{Enabled:false ListenAddr: AllowedPeers:[] FedClientOffsetTTL:168h0m0s} Client:{Enabled:true Hubs:[{Addr:localhost:7740 Subscribe:[alerts] Publish:[]}]} TLS:{Cert:/tmp/keyop-messenger-example/host2/cert/host2.crt Key:/tmp/keyop-messenger-example/host2/cert/host2.key CA:/tmp/keyop-messenger-example/host2/cert/ca.crt MinVersion:1.3 ExpiryWarnDays:30} Federation:{ReconnectBaseMS:500 ReconnectMaxMS:60000 ReconnectJitter:0.2 SendBufferMessages:10000 MaxBatchBytes:4194304} Dedup:{SeenIDLRUSize:100000} Audit:{MaxSizeMB:100 MaxFiles:10}}"
2026/04/28 17:43:12 INFO host1: subscribing to alerts topic on worker-1
2026/04/28 17:43:12 INFO host1: subscribed successfully
2026/04/28 17:43:12 INFO host1: listening for messages from host1
2026/04/28 17:43:12 INFO host2: messenger created successfully
2026/04/28 17:43:12 INFO host2: publishing message service=monitor-service
2026/04/28 17:43:12 INFO federation: hub accepted connection peer=localhost addr=127.0.0.1:57025
2026/04/28 17:43:12 INFO host1: received message="system problem!" origin=host2 service=monitor-service timestamp="2026-04-29 00:43:12" count=1
2026/04/28 17:43:13 INFO host2: publishing message service=monitor-service
2026/04/28 17:43:13 INFO host1: received message="system problem!" origin=host2 service=monitor-service timestamp="2026-04-29 00:43:13" count=2
2026/04/28 17:43:14 INFO host2: publishing message service=monitor-service
2026/04/28 17:43:14 INFO host1: received message="system problem!" origin=host2 service=monitor-service timestamp="2026-04-29 00:43:14" count=3
2026/04/28 17:43:15 INFO host2: publishing message service=monitor-service
2026/04/28 17:43:15 INFO host1: received message="system problem!" origin=host2 service=monitor-service timestamp="2026-04-29 00:43:15" count=4
2026/04/28 17:43:16 INFO host2: publishing message service=monitor-service
2026/04/28 17:43:16 INFO host1: received message="system problem!" origin=host2 service=monitor-service timestamp="2026-04-29 00:43:16" count=5
2026/04/28 17:43:17 INFO host2: publishing message service=monitor-service
2026/04/28 17:43:17 INFO host1: received message="system problem!" origin=host2 service=monitor-service timestamp="2026-04-29 00:43:17" count=6

... I hit ctrl+c here ...

^C2026/04/28 17:43:18 INFO host1: shutting down
2026/04/28 17:43:18 ERROR federation: receiver read error peer=localhost err="read tcp 127.0.0.1:7740->127.0.0.1:57025: use of closed network connection"
2026/04/28 17:43:18 ERROR federation: receiver read error peer=localhost:7740 err="websocket: close 1006 (abnormal closure): unexpected EOF"
2026/04/28 17:43:18 INFO host2: shutting down
2026/04/28 17:43:18 WARN removing temp directory path=/tmp/keyop-messenger-example

Federation Topologies

There are many patterns you can use to connect keyop-messenger instances:

I use a basic star topology for keyop:

keyop_messenger_federation_topology_1.png

If you have a client that publishes to multiple hubs for redundancy, it's recommended that you don't have any client pull that channel from more than one of those hubs. As long as everything is flowing smoothly, duplicates will generally be prevented by the in-memory LRU cache of recently-seen message IDs. But if one of the hubs goes offline for a while, and if either the LRU cache gets fully replaced on the subscriber or the subscriber instance gets restarted and loses the in-memory cache, then when the hub comes back online, you'll get a replay of all the old messages as they feed back into the restarted hub.

Here's a diagram illustrating some patterns for individual channels.

keyop_messenger_federation_topology_3.png

Observability

In the storage data directory, you will find three directories:

Log File Rotation

Channel logs are rotated when the reach storage.compaction_threshold_mb, which defaults to 256. Old channel logs are removed when all subscribers have moved beyond them, or when they are storage.max_subscriber_lag_mb behind, which defaults to 512.

Audit logs are automatically rotated when they reach audit.max_size_mb, which defaults to 100. By default it will preserve the last audit.max_files, which defaults to 10.

storage:
  compaction_threshold_mb: 256
  storage.max_subscriber_lag_mb: 512
audit:
  max_size_mb: 100
  max_files: 10

Maximum Message Sizes

keyop-messenger supports maximum message size is 10MB. This is not currently configurable. keyop-messenger supports sending attachments this large, but it is not optimized for it.

The maximum message size that can be sent to a peer is controlled by federation.max_batch_bytes. If a single message exceeds this on the receiving side, it is logged and then skipped. The sender receives an ack so it advances past the message and the connection stays alive.

The default value of federation.max_batch_bytes is 4 MiB. Adjust in config if your payloads are larger:

federation:
  max_batch_bytes: 8388608  # 8 MiB

Set to 0 to disable the limit, but note this will fall back to the maximum message size limit restriction.

If you want to send binary attachments in the payload, you can base64 encode them first, e.g.:

encodedData := base64.StdEncoding.EncodeToString(data)

Note that the attachment data will occupy space in the channel logs on every system they pass through until they get rotated (see Log File Rotation).

Also, since the channel logs are in .jsonl format, large messages will be stored on a single line in the log file. Most modern unix tools deal with very long lines, but some tools may have issues, e.g. sed, awk.

Trading Durability for Speed

By default, keyop-messenger performs an fsync after every publish, and also after every subscriber offset update. This ensures maximum durability, so that if a crash happens, as long as the disk is not corrupt, your data should be safe. But doing this significantly limits throughput as volume grows.

To increase throughput, you can switch to using periodic sync intervals:

If your data is not critical (e.g. sensor readings like temperature, where missing a few data points in a relatively rare system crash scenario is probably not an issue), then I recommend setting both values to 200ms to reduce the performance cost.

storage:
  data_dir: ~/.keyop/msgs
  offset_flush_interval_ms: 200
  sync_interval_ms: 200

Kafka solves this problem by replicating data to multiple nodes, so even if one node crashes, the data is still available on the other nodes. But keyop-messenger always delivers messages to a local channel file first, and then publishes to remote hubs from the local channel file, so it requires on a local fsync to ensure the data was persisted to the local disk.

Benchmarks

With sync_interval_ms and offset_flush_interval_ms set to 0 for maximum safety and durability:

Action Ops/sec Time per Op
Publish message 400 2.5 ms
Subscribe 200 6 ms
Federated Round-Trip 100 10 ms

With sync_interval_ms and offset_flush_interval_ms set to 200ms:

Action Ops/sec Time per Op
Publish message 110,000 10 µs
Subscribe 15,000 70 µs
Federated Round-Trip 9,000 120 µs

Refer to Trading Durability For Speed for more information.

Times recorded on an Apple M2 Max. You can test this on your own machine by running: make bench

Benchmarks for the round-trip message path exclude network travel time since the two messenger processes were measured running on the same host.

See Also