Skip to content

Deep Dive: Clustering

This document provides an exhaustive description of HyperbyteDB's clustering subsystem. It covers the node state machine, cluster bootstrap, startup and reconnect sync, Raft consensus for schema mutations, write replication, the replication log, Merkle-tree anti-entropy, delta sync, and graceful drain.


Table of Contents

  1. Architecture Overview
  2. Node State Machine
  3. Cluster Bootstrap
  4. Startup Sync
  5. Reconnect Sync
  6. Raft Consensus
  7. Write Replication
  8. Mutation Replication
  9. Replication Log
  10. Anti-Entropy (Merkle Trees)
  11. Delta Sync
  12. Graceful Drain
  13. Peer Internal Endpoints
  14. Cluster Configuration
  15. Metrics

1. Architecture Overview

HyperbyteDB uses a hybrid replication model:

  • Data writes are replicated to all peers (master-master). Each node independently accepts writes and fans them out via HTTP. The fan-out is per-node configurable as either async (fire-and-forget; default) or sync_quorum (await W-of-N peer acks before responding to the client). See Section 7 -- Write Replication.
  • Schema mutations (CREATE/DROP DATABASE, DELETE, user management, CQ management) are routed through Raft consensus (via openraft) to ensure consistent ordering across the cluster. Schema replication uses a separate code path and is not affected by the data replication mode.
            +---------+     +---------+     +---------+
            | Node 1  |<--->| Node 2  |<--->| Node 3  |
            +---------+     +---------+     +---------+
               |                |                |
               v                v                v
          Write repl.      Write repl.      Write repl.
          (async HTTP)     (async HTTP)     (async HTTP)
               |                |                |
               +-------+-------+-------+--------+
                       |               |
                  Raft (schema mutations)
                       |
              openraft consensus

Key properties

  • Every node is readable and writable -- clients can connect to any node.
  • Write replication is fire-and-forget -- local write succeeds immediately; replication is best-effort with retries.
  • Schema mutations are consistent -- Raft ensures all nodes apply mutations in the same order.
  • Anti-entropy via Merkle trees detects and repairs divergence.
  • Graceful drain ensures no data loss when removing a node.

2. Node State Machine

File: src/cluster/membership.rs

Each node in the cluster transitions through the following states:

  Joining --> Syncing --> Active --> Draining --> Leaving
                 ^           |
                 |           v
                 +---- Disconnected

States

State Description Accepts Writes Accepts Queries
Joining Node is requesting to join the cluster No No
Syncing Node is synchronizing data from a peer No No
Active Node is fully operational Yes Yes
Disconnected Node has missed heartbeats No Yes (stale)
Draining Node is preparing to leave (flushing WAL, waiting for acks) No Yes
Leaving Node has completed drain and is shutting down No No

State transitions

  • Joining -> Syncing: After sending a join request to an active peer.
  • Syncing -> Active: After successful data synchronization.
  • Active -> Disconnected: When heartbeat misses exceed heartbeat_miss_threshold.
  • Disconnected -> Syncing: When the node reconnects and needs to catch up.
  • Active -> Draining: When a graceful shutdown is initiated.
  • Draining -> Leaving: After the drain procedure completes.

ClusterMembership

pub struct ClusterMembership {
    pub version: u64,                      // Monotonic version counter
    pub nodes: HashMap<u64, NodeInfo>,     // node_id -> NodeInfo
}

pub struct NodeInfo {
    pub node_id: u64,
    pub addr: String,
    pub state: NodeState,
    pub joined_at: i64,
    pub last_heartbeat: i64,
    pub needs_sync: bool,
}
  • version is bumped on every mutation (add_node, remove_node, set_state).
  • SharedMembership = Arc<RwLock<ClusterMembership>> provides thread-safe shared access.

Key methods

Method Description
active_peers(exclude_id) Returns nodes with state == Active, excluding the given node
all_peers(exclude_id) Returns all nodes except the given one
next_node_id() Returns max(node_ids) + 1 for assigning new node IDs
set_needs_sync(node_id, bool) Marks a node for re-synchronization

3. Cluster Bootstrap

File: src/cluster/bootstrap.rs

ClusterBootstrap::init(config)

Called during application startup when cluster.enabled = true.

  1. Create replication log directory if it doesn't exist.
  2. Open ReplicationLog -- RocksDB-backed store for WAL and mutation ack tracking.
  3. Initialize membership -- create ClusterMembership with the local node as Active.
  4. Build PeerClient -- HTTP client for replication, configured with the peer list.
  5. Derive peer_addrs from config.cluster.peers (comma-separated).

start_raft(config, metadata)

  1. Open RaftStore (RocksDB with meta, logs, state column families) in config.raft_dir.
  2. Build Raft Network (HTTP transport).
  3. Build openraft::Raft<TypeConfig> with configured heartbeat interval, election timeout, and snapshot threshold.
  4. If node_id == 1: Initialize the Raft cluster with the local node as the sole voter.
  5. Return HyperbytedbRaft.

run_startup_sync(config, metadata, wal, data_dir)

  1. Set node state to Syncing.
  2. Build a SyncClient.
  3. Determine if this is a new node (no existing data) or reconnect:
  4. New node: Call sync_client.join_and_sync().
  5. Existing node: Call sync_client.reconnect_sync().
  6. Retry up to 5 times on failure.
  7. On success: set state to Active.
  8. On failure after retries: set state to Active with needs_sync = true (allows the node to operate but flags it for later re-sync).

4. Startup Sync

File: src/cluster/sync_client.rs

join_and_sync() -- Full join flow for new nodes

  1. Pick a sync peer via pick_sync_peer():
  2. First active peer from membership.
  3. If none, probe fallback addresses from config.
  4. Send join request -- POST /internal/membership/join with JoinRequest { node_id, addr }.
  5. Get manifest -- GET /internal/sync/manifest from the peer.
  6. Returns SyncManifest { node_id, wal_last_seq, databases } containing all database/RP/measurement definitions and Parquet file listings.
  7. Sync metadata -- POST /internal/sync/metadata:
  8. Receives a JSON snapshot of all metadata entries.
  9. Imports databases, measurements, Parquet file entries, users, tombstones, and CQ definitions via import_metadata_entry().
  10. Sync Parquet files -- For each Parquet file in the manifest:
  11. Fetch file content from peer via GET /internal/sync/file?path=....
  12. Verify CRC32 checksum.
  13. Write to local storage.
  14. Downloads are parallelized with sync_max_concurrent_files concurrency.
  15. WAL catchup -- POST /internal/sync/wal-catchup:
  16. Stream WAL entries from the peer starting from the local WAL sequence.
  17. Append each entry to the local WAL.
  18. Verify -- Build local Merkle trees and compare with the peer via POST /internal/sync/verify.
  19. Set state to Active even if verification fails (to avoid blocking).

5. Reconnect Sync

File: src/cluster/sync_client.rs

reconnect_sync() -- For existing nodes reconnecting after downtime

The reconnect strategy is chosen based on the WAL sequence gap between the local node and the peer:

Gap Analysis:
  local_wal_seq vs peer_wal_seq
       |
       v
  +-------------------+
  | gap == 0          | --> Merkle verify only
  | gap <= 10,000     | --> WAL catchup + verify
  | gap > 10,000      | --> Full metadata + delta sync + WAL catchup
  +-------------------+

Gap == 0 (no missed writes)

  1. Build local Merkle trees.
  2. POST /internal/sync/verify to a peer.
  3. If divergent buckets exist -> delta sync those buckets.
  4. If consistent -> done.

Small gap (<= 10,000 entries)

  1. WAL catchup from the peer (fetch WAL entries from local_seq to peer_seq).
  2. Merkle verify.
  3. If divergent -> delta sync.

Large gap (> 10,000 entries)

  1. Full metadata sync (re-import all metadata).
  2. Delta sync via Merkle comparison.
  3. WAL catchup.

6. Raft Consensus

Files: src/cluster/raft/

HyperbyteDB integrates openraft for schema mutation consensus.

Type configuration

// src/cluster/raft/mod.rs
struct TypeConfig;
impl openraft::RaftTypeConfig for TypeConfig {
    type D = ClusterRequest;     // Log entry data
    type R = ClusterResponse;    // Apply result
    type NodeId = u64;
    type Node = BasicNode;
    type SnapshotData = Cursor<Vec<u8>>;
    type AsyncRuntime = TokioRuntime;
}

pub type HyperbytedbRaft = openraft::Raft<TypeConfig>;

Cluster request types

// src/cluster/raft/types.rs
pub enum ClusterRequest {
    SetNodeState { node_id: u64, state: NodeState },
    SchemaMutation(MutationRequest),
}

pub struct ClusterResponse {
    pub ok: bool,
    pub message: String,
}

RaftStore (log storage)

File: src/cluster/raft/log_store.rs

RocksDB-backed storage with three column families:

Column Family Purpose
meta Raft vote state, last purged log ID
logs Raft log entries (key: log ID as big-endian u64)
state State machine: last applied, membership, cluster data

Implements RaftStorage, RaftLogReader, and RaftSnapshotBuilder.

State machine application

File: src/cluster/raft/state_machine.rs

When a ClusterRequest is committed by Raft:

  • SetNodeState: Updates the local SharedMembership.
  • SchemaMutation: Calls the appropriate MetadataPort method:
  • CreateDatabase -> metadata.create_database()
  • DropDatabase -> metadata.drop_database()
  • Delete -> metadata.store_tombstone()
  • CreateUser -> metadata.create_user()
  • etc.

Raft-to-membership synchronization

sync_raft_membership_to_shared() propagates Raft membership changes to the local SharedMembership. Raft voters are mapped to Active or Syncing states.

Network transport

File: src/cluster/raft/network.rs

Raft messages are exchanged via HTTP:

Endpoint Purpose
POST /internal/raft/append AppendEntries RPC
POST /internal/raft/vote RequestVote RPC
POST /internal/raft/snapshot InstallSnapshot RPC
POST /internal/raft/membership Membership change

Configuration

Parameter Default Description
raft_heartbeat_interval_ms 300 Raft heartbeat interval
raft_election_timeout_ms 1000 Raft election timeout
raft_snapshot_threshold 1000 Log entries before snapshot

7. Write Replication

Files: src/application/peer_ingestion_service.rs, src/cluster/peer_client.rs

Per-node modes

[cluster.replication] selects this node's coordinator behavior. Receivers always serve both styles from the same /internal/replicate endpoint, so any combination of modes across the cluster is safe.

Mode Coordinator behavior
async (default) Fire-and-forget HTTP fan-out. Returns to the client immediately after the local WAL append. Failures retry and trip hinted handoff.
sync_quorum Fan out with X-Hyperbytedb-Sync: true and await W peer acks before returning to the client. W = sync_quorum.min_acks.resolve(active_peers) -- self is never counted.

Async flow (mode = "async")

  1. Client writes to any node.
  2. PeerIngestionService::ingest():
  3. Parse line protocol, register metadata, append to local WAL.
  4. Dispatch to PeerClient::replicate_write() (spawns background task).
  5. Return 204 No Content to the client.
  6. PeerClient::replicate_write():
  7. Push the batch onto the bounded outbound coalescer.
  8. The outbound loop coalesces consecutive batches with the same (db, rp, precision) and contiguous wal_seq (up to replication_max_coalesce_body_bytes).
  9. For each peer, spawn a task that POSTs to /internal/replicate with the line-protocol body, X-Hyperbytedb-* routing headers, and X-Hyperbytedb-Replicated: true.
  10. On success (2xx): replication_log.set_wal_ack(peer_id, wal_seq).
  11. On failure: retry with exponential backoff; on exhaustion, push to hinted handoff.

Sync quorum flow (mode = "sync_quorum")

  1. Client writes to any node. PeerIngestionService::ingest() parses, registers metadata, and appends to the local WAL.
  2. Resolve required = min_acks.resolve(active_peers().len()). If required == 0 (single-node cluster), return Ok(()) immediately.
  3. Acquire one shared inflight permit from batch_semaphore (sync and async share the same backpressure budget).
  4. Spawn one per-peer task; each sets X-Hyperbytedb-Sync: true and parses the peer's 200 OK body for ack_seq. Per-peer retry/backoff is identical to the async path.
  5. Coordinator awaits acks via a select_all loop with deadline ack_timeout_ms:
  6. Once required peers ack, return 204 No Content to the client.
  7. On timeout, return 504 Gateway Timeout (ReplicationQuorumTimeout) -- but the in-flight peer tasks continue so durability still ratchets forward in the background.
  8. Receiver: same ReplicationApplyQueue as the async path; the only difference is the handler awaits the apply oneshot before responding (see handle_replicate_write in src/adapters/http/peer_handlers.rs).

Retry behavior (both modes)

  • Base delay: 1 second.
  • Max delay: 30 seconds.
  • Max retries: replication_max_retries (default 5).
  • Backoff: min(delay * 2, 30s).
  • On exhaustion: push to hinted handoff (when configured) and return failure.

Mixed-mode and rolling restart

The mode is per-node coordinator config. Receivers ignore the mode entirely and react only to X-Hyperbytedb-Sync. Concretely:

  • [A=sync_quorum, B=async, C=async]: A's writes await acks from B and C; B and C's writes are async as today. Safe.
  • Rolling-restart node A out of sync_quorum: B and C in sync_quorum recompute required against the smaller active_peers() count and continue serving (e.g. 1 required peer ack instead of 2 in a 3-node cluster).
  • A new sync_quorum coordinator talking to an old binary that ignores X-Hyperbytedb-Sync still works -- the old peer 200s on success, which counts as one ack.

Loop prevention

The receiving node checks for X-Hyperbytedb-Replicated: true. If present, it: - Persists the write locally (WAL + metadata). - Does NOT re-replicate to other peers.


8. Mutation Replication

File: src/cluster/peer_client.rs

With Raft

When Raft is configured, schema mutations are routed through Raft consensus:

raft.client_write(ClusterRequest::SchemaMutation(mutation_request)).await

Raft ensures all nodes apply the mutation in the same order.

Without Raft (fallback)

Mutations are broadcast directly to all peers:

  1. Append to the local replication log: replication_log.append_mutation(request) -> returns seq.
  2. For each active peer:
  3. POST /internal/replicate-mutation with MutationReplicateRequest { seq, origin_node_id, mutation }.
  4. On success: replication_log.set_mutation_ack(peer_id, seq).
  5. On failure: retry with exponential backoff.

MutationRequest variants

pub enum MutationRequest {
    CreateDatabase(String),
    DropDatabase(String),
    CreateRetentionPolicy { db: String, rp: RetentionPolicy },
    CreateUser { username: String, password_hash: String, admin: bool },
    DropUser(String),
    Delete { database: String, measurement: String, predicate_sql: String },
    CreateContinuousQuery { database: String, name: String, definition: ContinuousQueryDef },
    DropContinuousQuery { database: String, name: String },
}

Deduplication

The receiving node calls replication_log.check_and_record_mutation(origin_node_id, seq). This returns true only if seq is greater than the last applied sequence for that origin, preventing duplicate application.


9. Replication Log

File: src/cluster/replication_log.rs

The replication log is a RocksDB-backed store that tracks WAL and mutation acknowledgements from peers.

Key layout

Key Pattern Value Purpose
repl_ack:{peer_id} u64 (big-endian) Last WAL sequence acked by peer
mutation_log:{seq:016x} Serialized MutationReplicateRequest Mutation log entry
mutation_ack:{peer_id} u64 (big-endian) Last mutation seq acked by peer

Key operations

Method Description
set_wal_ack(peer_id, seq) Record WAL ack (only moves forward)
get_wal_ack(peer_id) Get last WAL ack for a peer
min_wal_ack() Minimum WAL ack across all peers (for safe truncation)
append_mutation(request) Append mutation, return sequence number
read_mutations_from(from_seq, max) Read mutation log entries
set_mutation_ack(peer_id, seq) Record mutation ack
check_and_record_mutation(origin_id, seq) Dedup: apply only if seq > last for origin
truncate_mutations_before(seq) Delete old mutation log entries
remove_peer(peer_id) Clear all ack keys for a peer

Safe WAL truncation

The flush service uses min_wal_ack() to determine how far the WAL can be safely truncated:

safe_truncate = min(chunk_max_seq, min_wal_ack_across_peers)

This ensures peers that are catching up can still read needed WAL entries.


10. Anti-Entropy (Merkle Trees)

File: src/cluster/merkle.rs

Merkle trees provide a mechanism to detect data divergence between nodes without transmitting full file lists.

Tree structure

One Merkle tree per (database, retention_policy, measurement):

MerkleTreeSummary {
    database: String,
    rp: String,
    measurement: String,
    root_hash: String,
    buckets: Vec<MerkleBucket>,
}

MerkleBucket {
    bucket: String,        // "YYYY-MM-DD/HH" format
    hash: String,          // SHA-256 hash
    file_count: usize,
}

Bucket hashing

For each hourly bucket:

  1. Collect all Parquet files in that bucket from metadata.
  2. For each file, compute: SHA256(path || min_time_bytes || max_time_bytes || file_checksum).
  3. Sort file hashes.
  4. Bucket hash = SHA256(sorted_file_hashes_concatenated).

Root hashing

Root hash = SHA256(all_bucket_hashes_concatenated).

If there are no buckets, root hash = "empty".

Tree comparison

Function: compare_trees(local, remote) -> Vec<DivergentBucket>

Compares local and remote trees and returns buckets where hashes differ:

pub struct DivergentBucket {
    pub database: String,
    pub rp: String,
    pub measurement: String,
    pub bucket: String,
    pub local_hash: String,
    pub remote_hash: String,
}

MerkleCache

pub struct MerkleCache {
    pub generation: u64,
    pub trees: Vec<MerkleTreeSummary>,
}

Trees are rebuilt only when the metadata generation counter has changed (incremented on every register_parquet_file / unregister_parquet_file). This avoids expensive tree rebuilds when data hasn't changed.

Background anti-entropy

A background task runs every anti_entropy_interval_secs (default 60s):

  1. Build local Merkle trees (using cache if generation unchanged).
  2. Send POST /internal/sync/verify to a peer with local trees.
  3. If divergent buckets are found, trigger delta sync.

11. Delta Sync

File: src/cluster/sync_client.rs

delta_sync_from_verify()

When Merkle comparison reveals divergent buckets:

  1. For each DivergentBucket:
  2. Fetch the peer's file list for that bucket.
  3. Compare with local file list.
  4. Download missing or differing files.
  5. Verify CRC32 checksums.
  6. Register downloaded files in local metadata.

This is more efficient than a full sync because only divergent time buckets are transferred.


12. Graceful Drain

File: src/cluster/drain.rs

The drain procedure ensures no data loss when removing a node from the cluster.

DrainService::drain()

Step 1: Set node state to Draining
        (HTTP write handler starts rejecting writes with 503)
        |
Step 2: Flush all WAL entries to Parquet
        (flush_service.drain() -- loops until WAL is empty)
        |
Step 3: Wait for replication acks (up to 60 seconds)
        (loop: check if all peers have acked local WAL seq
         and mutation seq, sleep 2s between checks)
        |
Step 4: Verify data consistency with a peer
        (build local Merkle trees, POST /internal/sync/verify)
        |
Step 5: Notify peers of leave
        (POST /internal/membership/leave to all active peers)
        |
Step 6: Set node state to Leaving

Step details

Step 1 -- Reject writes: The HTTP write handler checks node state and returns 503 Service Unavailable with an optional X-Hyperbytedb-Redirect header for Draining, Leaving, Syncing, or Joining states.

Step 2 -- Flush WAL: flush_service.drain() repeatedly calls flush() until no WAL entries remain. This ensures all ingested data is persisted as Parquet files.

Step 3 -- Wait for acks: Polls replication_log.get_wal_ack(peer_id) and get_mutation_ack(peer_id) for each active peer. Waits until all peers have acked up to the local WAL and mutation sequence numbers. Times out after 60 seconds with a warning.

Step 4 -- Merkle verify: Builds local Merkle trees and sends a verify request to one active peer. Logs the result but proceeds even if verification fails.

Step 5 -- Notify peers: Sends POST /internal/membership/leave to all active peers so they remove this node from their membership.

Step 6 -- Set Leaving: Final state transition. The node can be safely shut down.


13. Peer Internal Endpoints

Files: src/adapters/http/peer_handlers.rs, src/adapters/http/raft_handlers.rs

Replication endpoints

Endpoint Method Purpose
/internal/replicate POST Receive replicated write data
/internal/replicate-mutation POST Receive replicated schema mutation

Membership endpoints

Endpoint Method Purpose
/internal/membership/join POST Handle join request from new node
/internal/membership/leave POST Handle leave notification
/internal/membership/heartbeat POST Receive heartbeat from peer

Sync endpoints

Endpoint Method Purpose
/internal/sync/manifest GET Return full sync manifest (includes Parquet inventory with origin_node_id per file when present)
/internal/sync/metadata GET Export metadata snapshot for joiners
/internal/sync/parquet/{*path} GET Serve a Parquet file by manifest path for sync or repair
/internal/sync/wal GET Stream WAL entries (from_seq, max_entries query params)
/internal/sync/trigger POST Ask peer to run sync / catch-up (leader-driven)
/internal/sync/verify POST Merkle tree comparison (anti-entropy)
/internal/bucket-hash GET Canonical SHA-256 for one (db, rp, measurement, bucket, origin); optional bucket_duration_nanos

Parquet consistency after replication: compaction-driven verified merge and membership self-repair use manifest + bucket-hash to heal divergent or missing slices. See Deep Dive: Self-Repair.

Raft endpoints

Endpoint Method Purpose
/internal/raft/append POST Raft AppendEntries RPC
/internal/raft/vote POST Raft RequestVote RPC
/internal/raft/snapshot POST Raft InstallSnapshot RPC
/internal/raft/membership POST Raft membership change

Drain endpoint

Endpoint Method Purpose
/internal/drain POST Trigger drain procedure

14. Cluster Configuration

File: src/config.rs -- ClusterConfig

Parameter Default Description
enabled false Enable cluster mode
node_id 1 Unique node identifier
cluster_addr "127.0.0.1:8086" Address other nodes use to reach this node
peers "" Comma-separated list of peer addresses
heartbeat_interval_secs 2 How often to send heartbeats
heartbeat_miss_threshold 5 Missed heartbeats before marking disconnected
anti_entropy_interval_secs 60 Merkle tree verification interval
replication_log_dir "./replication_log" RocksDB directory for replication tracking
raft_dir "./raft" RocksDB directory for Raft state
sync_max_concurrent_files 4 Max concurrent file downloads during sync
replication_max_retries 5 Max retries for failed replications
raft_heartbeat_interval_ms 300 Raft heartbeat interval (milliseconds)
raft_election_timeout_ms 1000 Raft election timeout (milliseconds)
raft_snapshot_threshold 1000 Log entries before Raft snapshot
replication.mode "async" Coordinator replication mode: "async" (default) or "sync_quorum"
replication.ack_timeout_ms 5000 sync_quorum worst-case latency budget; on timeout client gets 504 and unacked peers fall back to hinted handoff
replication.sync_quorum.min_acks "majority" Required PEER acks for sync_quorum. "majority" resolves to floor(N/2) peer acks at request time; integers are clamped to active_peers().len()

15. Metrics

Replication metrics

Metric Type Description
hyperbytedb_replication_writes_total counter Write replication attempts
hyperbytedb_replication_errors_total counter Failed write replications
hyperbytedb_replication_duration_seconds histogram Replication latency
hyperbytedb_replication_mutations_total counter Mutation replication attempts
hyperbytedb_replication_mode{mode} gauge 1 for the currently configured coordinator mode, 0 for the others (mode ∈ {async, sync_quorum})
hyperbytedb_replication_sync_acks_total{outcome} counter Outcome of sync_quorum quorum waits (outcome ∈ {ok, timeout, error})
hyperbytedb_replication_sync_duration_seconds histogram sync_quorum time from coordinator-accept to required-acks-received
hyperbytedb_replication_sync_peer_ack_seconds{peer} histogram Per-peer ack RTT in sync_quorum mode (helps spot the straggler)
hyperbytedb_replication_sync_required_acks gauge Currently resolved required peer-ack count (tracks membership changes)
hyperbytedb_replication_sync_apply_received_total counter Receiver count of inbound sync_quorum requests (where X-Hyperbytedb-Sync: true)
hyperbytedb_replication_sync_apply_errors_total counter Receiver errors while applying a sync_quorum request (returned as 500 to coordinator)

Cluster state metrics

Metric Type Description
hyperbytedb_cluster_node_state gauge Current node state (0=Joining, 1=Syncing, 2=Active, 3=Disconnected, 4=Draining, 5=Leaving)
hyperbytedb_drain_total counter Drain procedures initiated
hyperbytedb_cluster_peers_active gauge Number of active peers
hyperbytedb_uptime_seconds gauge Node uptime in seconds