Skip to content

Embedded segmented log with 2μs writes and multi-process coordination. Optimized for edge and single-node deployments.

License

Notifications You must be signed in to change notification settings

orbiterhq/comet

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

18 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

☄️ Comet

High-performance embedded segmented log for edge observability. Built for single-digit microsecond latency and bounded resources.

Architecture Guide | Performance Guide | Troubleshooting | Security | API Reference

Note

This is very much an experiment in vibe coding. While the ideas are sound and the test coverage is robust, you may want to keep that in mind before using it for now.

What is Comet?

Comet is a segmented append-only log optimized for observability data (metrics, logs, traces) at edge locations. It implements the same pattern as Kafka's storage engine - append-only segments with time/size-based retention - but embedded directly in your service with aggressive deletion policies for resource-constrained environments.

Each shard maintains a series of immutable segment files that are rotated at size boundaries and deleted based on retention policies, ensuring predictable resource usage without the complexity of circular buffers or in-place overwrites.

Comet requires local filesystems (ext4, xfs, etc.) for its microsecond latency guarantees. It is unapologetically local. If you need distributed storage, use a proper distributed system like NATS JetStream or Kafka instead.

The Edge Storage Problem

Edge deployments need local observability buffering, but other solutions fall short:

  • Kafka: Requires clusters, complex ops, ~1-5ms latency
  • RocksDB: Single-threaded writes, 50-200μs writes, 100ms+ during compaction stalls
  • Redis: Requires separate server, memory-only without persistence config
  • Ring buffers: No persistence, no compression, data loss on overflow
  • Files + rotation: No indexing, no consumer tracking, manual everything

The gap: No embedded solution with Kafka's reliability at microsecond latencies.

Features

  • Ultra-low latency: Up to 12,634,200 entries/sec with batching (2.4M ops/sec with optimal sharding)
    • Comet uses periodic checkpoints (default: every 1000 writes or 1 second) to persist data to disk. Between checkpoints, writes are acknowledged after being written to the OS page cache.
  • Predictable performance: No compaction stalls or write amplification like LSM-trees
  • True multi-process support: Hybrid coordination (mmap + file locks), crash-safe rotation, real OS processes
  • O(log n) lookups: Binary searchable index with bounded memory usage
  • Lock-free reads: Atomic pointers, zero-copy via mmap with memory safety
  • Automatic retention: Time and size-based cleanup, protects unconsumed data
  • Production ready: Crash recovery, built-in metrics, extensive testing
  • Smart sharding: Consistent hashing, automatic discovery, batch optimizations
  • Optional zstd compression: ~37% storage savings when needed

Multi-Process Coordination

Unlike other embedded solutions, Comet enables true multi-process coordination through memory-mapped state files. Perfect for prefork web servers and multi-process deployments.

  • Automatic shard ownership - Each process owns specific shards based on shardID % processCount == processID
  • Per-shard state files - Each shard has its own comet.state for metrics and recovery
  • Memory-mapped coordination - Lock-free operations through atomic memory access
  • Crash-safe design - State files enable automatic recovery on restart

How Does Comet Compare?

Feature Comet Kafka Redis Streams RocksDB Proof
Write Latency 1.7μs (33μs multi-process) 1-5ms 50-100μs 50-200μs Benchmarks
Multi-Process ✅ Real OS processes ✅ Distributed ❌ Single process ⚠️ Mutex locks Tests
Resource Bounds ✅ Time & size limits ⚠️ JVM heap ⚠️ Memory only ⚠️ Manual compact Retention
Crash Recovery ✅ Automatic ✅ Replicas ⚠️ AOF/RDB ✅ WAL Recovery
Zero Copy Reads ✅ mmap ❌ Network ❌ Serialization ❌ Deserialization Reader
Storage Overhead ~12 bytes/entry ~50 bytes/entry ~20 bytes/entry ~30 bytes/entry Format
Sharding ✅ Built-in ✅ Partitions ❌ Manual ❌ Manual Client
Compression ✅ Optional zstd ✅ Multiple codecs ❌ None ✅ Multiple Config
Embedded ✅ Native ❌ Requires cluster ❌ Requires server ✅ Native -

Quick Start

The Easy Way™

Step 1: Create a client

client, err := comet.NewClient("/var/lib/comet")
defer client.Close()

Step 2: Write your events

// Pick which shard to write to based on a key (for consistent routing)
// High cardinality keys (e.g. uuid) are recommended for consistent routing
stream := client.PickShardStream("events:v1", event.ID, 256)
// This returns something like "events:v1:shard:00A7" based on hash(event.ID) % 256

ids, err := client.Append(ctx, stream, [][]byte{
    []byte(event.ToJSON()),
})

Step 3: Process events

consumer := comet.NewConsumer(client, comet.ConsumerOptions{
    Group: "my-processor",
})

// Process() is the main API - it handles everything for you!
// By default, it discovers and processes ALL shards automatically
err = consumer.Process(ctx, func(ctx context.Context, messages []comet.StreamMessage) error {
    for _, msg := range messages {
        processEvent(msg.Data)  // Your logic here
    }
    return nil  // Success = automatic progress tracking
})

That's it! Comet handles:

  • Compression - Large events compressed automatically
  • Sharding - Load distributed across 16 shards
  • Retries - Failed batches retry automatically
  • Progress - Consumer offsets tracked per shard
  • Cleanup - Old data deleted automatically
  • Recovery - Crash? Picks up where it left off

Want more control? Scale horizontally:

// Deploy this same code across 3 processes:
err = consumer.Process(ctx, processEvents,
    comet.WithStream("events:v1:shard:*"),
    comet.WithConsumerAssignment(workerID, numWorkers),  // This worker + total count
)
// Each worker processes different shards automatically!
// No coordination needed - Comet handles it

Production-Ready Example

// Define your processing function with context support
processEvents := func(ctx context.Context, messages []comet.StreamMessage) error {
    for _, msg := range messages {
        // Check for cancellation
        if ctx.Err() != nil {
            return ctx.Err()
        }
        // Process each message
        if err := handleEvent(msg.Data); err != nil {
            return err // Will trigger retry
        }
    }
    return nil
}

err = consumer.Process(ctx, processEvents,
    comet.WithStream("events:v1:shard:*"),
    comet.WithBatchSize(1000),
    comet.WithPollInterval(50 * time.Millisecond),

    // Optional: Add observability
    comet.WithErrorHandler(func(err error, retryCount int) {
        metrics.Increment("comet.errors", 1)
        log.Printf("Retry %d: %v", retryCount, err)
    }),
    comet.WithBatchCallback(func(size int, duration time.Duration) {
        metrics.Histogram("comet.batch.size", float64(size))
        metrics.Histogram("comet.batch.duration_ms", duration.Milliseconds())
    }),
)

Need to tweak something?

// Only override what you need:
config := comet.DefaultCometConfig()
config.Retention.MaxAge = 24 * time.Hour  // Keep data longer
client, err := comet.NewClient("/var/lib/comet", config)

Configuration Structure

type CometConfig struct {
    Compression CompressionConfig  // Controls compression behavior
    Indexing    IndexingConfig     // Controls indexing and lookup
    Storage     StorageConfig      // Controls file storage
    Concurrency ConcurrencyConfig  // Controls multi-process behavior
    Retention   RetentionConfig    // Controls data retention
}

Architecture

┌─────────────────┐     ┌─────────────────┐
│   Your Service  │     │  Your Service   │
│    Process 0    │     │    Process 1    │
│  ┌───────────┐  │     │  ┌───────────┐  │
│  │   Comet   │  │     │  │   Comet   │  │
│  │  Client   │  │     │  │  Client   │  │
│  └─────┬─────┘  │     │  └─────┬─────┘  │
└────────┼────────┘     └────────┼────────┘
         │                       │
         ▼                       ▼
    ┌──────────────────────────────────┐
    │      Segmented Log Storage       │
    │                                  │
    │  Shard 0: [seg0][seg1][seg2]→    │ ← Process 0
    │           [comet.state]          │
    │  Shard 1: [seg0][seg1]→          │ ← Process 1
    │           [comet.state]          │
    │  Shard 2: [seg0][seg1][seg2]→    │ ← Process 0
    │           [comet.state]          │
    │  Shard 3: [seg0][seg1]→          │ ← Process 1
    │           [comet.state]          │
    │  ...                             │
    │                                  │
    │  ↓ segments deleted by retention │
    └──────────────────────────────────┘

Performance Optimizations

Comet achieves microsecond-level latency through careful optimization:

  1. Lock-Free Reads: Memory-mapped files with atomic pointers and defensive copying for memory safety
  2. Binary Searchable Index: O(log n) entry lookups instead of linear scans
  3. Vectored I/O: Batches multiple writes into single syscalls
  4. Batch ACKs: Groups acknowledgments by shard to minimize lock acquisitions
  5. Pre-allocated Buffers: Reuses buffers to minimize allocations
  6. Concurrent Shards: Each shard has independent locks for parallel operations

How It Works

  1. Append-Only Segments: Data is written to segment files that grow up to MaxFileSize
  2. Segment Rotation: When a segment reaches max size, it's closed and a new one starts
  3. Binary Index: Entry locations are indexed for O(log n) lookups with bounded memory
  4. Retention: Old segments are deleted based on age (MaxAge) or total size limits
  5. Sharding: Load is distributed across multiple independent segmented logs
  6. Index Limits: Index memory is capped by MaxIndexEntries - older entries are pruned

Resource Usage

With default settings:

  • Memory: ~2MB per shard (10k index entries × ~200 bytes/entry)
  • Disk: Bounded by retention policy (MaxShardSize × shard count)
  • CPU: Minimal - compression happens outside locks

Example: 16 shards × 2MB index = 32MB memory for indexes Example: 16 shards × 1GB/shard = 16GB max disk usage

Sharding

Comet uses deterministic sharding for load distribution. Here's how it works:

Writing

// The key (event ID, user ID, tenant, etc.) determines which shard gets the data
stream := client.PickShardStream("events:v1", event.ID, 256)
// Returns "events:v1:shard:0007" (hash("user-123") % 16 = 7)

// The key is ONLY used for routing - it's not stored anywhere!
client.Append(ctx, stream, data)

Reading

// Process ALL shards (recommended)
err = consumer.Process(ctx, handler,
    comet.WithStream("events:v1:shard:*"))  // The * wildcard finds all shards

// Process specific shards only
err = consumer.Process(ctx, handler,
    comet.WithShards(0, 1, 2))  // Only process shards 0, 1, and 2

// Process with advanced options
err = consumer.Process(ctx, handler,
    comet.WithStream("events:v1:shard:*"),
    comet.WithBatchSize(1000),
    comet.WithConsumerAssignment(workerID, 3))  // This is worker 'workerID' of 3 total workers

Use Cases

Right tool for:

  • Edge deployments with limited storage
  • High-frequency observability data (metrics, logs, traces)
  • Recent data access patterns (debugging last N hours)
  • Local buffering before shipping to cloud
  • Multi-service nodes requiring predictable resource usage

Not for:

  • Network filesystems (NFS, CIFS, etc.)
  • Long-term storage (use S3/GCS)
  • Transactional data requiring ACID
  • Random access patterns
  • Complex queries or aggregations

Performance notes

Durability Note: Comet uses periodic checkpoints (default: every 1000 writes or 1 second) to persist metadata to disk. Between checkpoints, writes are acknowledged after being written to the OS page cache. This provides excellent performance while maintaining durability through:

  • OS page cache (typically synced within 30 seconds)
  • Explicit fsync on file rotation
  • Crash recovery that rebuilds state from data files

Performance Metrics

  • ACK performance: 201ns per ACK (5M ACKs/sec) for single ACKs
  • Memory efficiency: 7 allocations per write batch, 4 allocations per ACK
  • Storage overhead: 12 bytes per entry (4-byte length + 8-byte timestamp)

Configuration

Single-Process vs Multi-Process Mode

Comet supports both single-process and multi-process deployments with a unified state management system:

// Single-process mode (default) - fastest performance
client, err := comet.NewClient("/data/streams")

// Multi-process mode - for prefork/multi-process deployments
client, err := comet.NewMultiProcessClient("/data/streams")

How multi-process mode works:

  • Each process owns specific shards based on: shardID % processCount == processID
  • Processes automatically coordinate through memory-mapped state files
  • No configuration needed - just use NewMultiProcessClient()
  • Automatic process ID assignment from a pool

When to use multi-process mode:

  • Process isolation is critical
  • Using prefork web servers (e.g., Go Fiber with prefork)
  • Need independent process scaling
  • You're already batching writes (reduces the latency impact)

When to use single-process mode (default):

  • Single service deployment
  • Don't need process-level isolation

Sharding Configuration and Performance

Comet uses sharding to enable parallel writes and horizontal scaling. Based on extensive benchmarking, here are the optimal configurations:

Performance Benchmarks

With a 3GB memory budget and 16 concurrent threads on 2023 MacBook Air w/ Apple M2 chip:

Configuration Ops/sec Latency Performance vs Default
1 shard × 1GB 748K 1.0μs Baseline
4 shards × 768MB (default) 912K 1.0μs -
16 shards × 192MB 1.3M <1μs +45%
64 shards × 48MB 1.5M <1μs +65%
256 shards × 10MB 2.4M <1μs +168%
1024 shards × 4MB 7.8K 127μs -91% (degradation)

Multi-Process Performance

With 256 shards and varying process counts on 2023 MacBook Air w/ Apple M2 chip:

Processes Shards/Process Ops/sec Performance Gain
1 256 1.3M Baseline
4 64 5.3M 4x
8 32 6.2M 4.8x
16 16 6.4M 4.9x

Recommended Configurations

For single-process deployments:

// Option 1: Use the OptimizedConfig helper
config := comet.OptimizedConfig(256, 3072)  // 256 shards with 3GB memory budget
client, err := comet.NewClient("/data", config)

// Option 2: Manual configuration
config := comet.DefaultCometConfig()
config.Storage.MaxFileSize = 10 << 20  // 10MB

// Use 256 shards when creating streams
stream := client.PickShardStream("events:v1", event.ID, 256)

For multi-process deployments:

// 4-8 processes with 256 total shards works best
config := comet.MultiProcessConfig()
config.Storage.MaxFileSize = 10 << 20  // 10MB
config.Concurrency.ProcessCount = 4    // 64 shards per process

Sharding Best Practices

  1. Choose shard count upfront: Changing shard count requires data migration
  2. Use high-cardinality keys: UUIDs, user IDs, or request IDs for even distribution
  3. Powers of 2: Use 16, 64, 256 shards for optimal hash distribution
  4. File size scaling: Smaller files (10-50MB) work better with many shards
  5. Memory budget: Plan for memory mapped capacity for optimal performance

How Sharding Works

// Helper functions for shard management
stream := client.PickShardStream("events:v1", uniqueKey, 256) // One-liner
shardID := client.PickShard(uniqueKey, 256)                   // Get shard ID
streamName := comet.ShardStreamName("events:v1", shardID)     // "events:v1:00FF"

// In multi-process mode, PickShard returns only shards owned by this process
// In single-process mode, returns any shard from 0 to shardCount-1

// Get all shards for parallel processing
shardIDs := comet.AllShardsRange(256)                   // [0, 1, ..., 255]
streams := comet.AllShardStreams("events", "v1", 256)   // All stream names

// Consumers automatically discover all shards
consumer.Process(ctx, handler,
    comet.WithStream("events:v1:shard:*"))  // Wildcard pattern

License

MIT