Skip to content

Deep Dive: Compaction-Driven Parquet Self-Repair

This document describes how HyperbyteDB detects and corrects divergent or missing Parquet slices on cluster nodes after data has been replicated or compacted. It complements Deep Dive: Compaction and Deep Dive: Clustering.

Replication and WAL catch-up keep the open tail reasonably aligned, but closed Parquet files can still drift (partial failures, restarts, or metadata skew). The compactor acts as a periodic validator and repair agent against peers that authored the data.


Table of Contents

  1. Problems this addresses
  2. Sync manifest provenance
  3. Canonical bucket content hash
  4. Verified compaction (multi-origin buckets)
  5. Membership-driven repair (missing slices)
  6. HTTP: /internal/bucket-hash
  7. Configuration
  8. Metrics and logs
  9. Operational guidance
  10. Code map

1. Problems this addresses

Issue Why it happens Repair approach
Wrong bytes for a peer’s slice in a time bucket Leader/follower applied different batches, corruption, partial fetch Compare bucket hash with author peer; replace local files for that origin in that bucket
Missing slice locally (no metadata rows for peer P in hour H) Never replicated, or metadata lost Discover candidates from P’s sync manifest; hash compare (empty vs non-empty); fetch
Bucket duration mismatch on hash API Old servers hard-coded 1h while compaction used 1d Client sends bucket_duration_nanos; server defaults to configured compaction duration

2. Sync manifest provenance

Files: src/cluster/sync.rsParquetFileManifest, build_manifest

Each row in the sync manifest now includes:

origin_node_id: Option<u64>   // omitted in JSON from older peers → deserializes as None

build_manifest fills this from metadata (get_parquet_files_with_times), so authoring node is visible to remote callers without inferring only from the file path.

Backward compatibility: Peers that omit origin_node_id still work: repair logic falls back to the flush naming convention (_n{node_id}_ in the filename). Helpers: manifest_entry_matches_origin, parquet_path_suggests_origin.


3. Canonical bucket content hash

Files: src/cluster/data_hash.rscompute_bucket_hash, BucketHashResponse

For a given (database, retention policy, measurement, bucket_start_nanos, bucket_duration_nanos, origin_node_id):

  1. Select Parquet files whose metadata origin equals origin_node_id and whose min_time falls in [bucket_start, bucket_start + bucket_duration).
  2. If none match → response hash "empty", file_count: 0.
  3. Otherwise read each file from storage, merge Arrow batches, sort canonically by time (then column order), serialize to Arrow IPC stream bytes, SHA-256 hex digest.

The same function runs locally and behind GET /internal/bucket-hash so both sides use identical semantics when durations and parameters match.


4. Verified compaction (multi-origin buckets)

File: src/application/compaction_service.rsverified_compact

When it runs: After each periodic compact() tick, if the node was bootstrapped with cluster membership (with_verified_compaction in src/bootstrap.rs).

Age gate: Only buckets with data older than verified_compaction_age_secs (default 3600s) participate—avoid fighting the in-memory / WAL tail.

Algorithm (per measurement):

  1. Group local files by (time_bucket, origin_node_id).
  2. For each remote origin present in that bucket’s map, resolve the peer address from membership, compute local hash via compute_bucket_hash, fetch remote hash from GET /internal/bucket-hash (including bucket_duration_nanos aligned with local compaction config).
  3. On mismatch, fetch_and_replace_origin_data: remove local files for that origin in that bucket, pull the peer’s manifest, download matching Parquet via /internal/sync/parquet/... only for manifest rows that match the origin (manifest_entry_matches_origin).
  4. When all remote origins in the bucket verify, optionally merge multi-origin inputs into compacted outputs (existing verified-compaction merge path).

Limitation addressed elsewhere: If the bucket only contains this node’s origin (no local metadata for peer files), the old loop never contacted peers. Membership-driven repair (below) closes that gap.


5. Membership-driven repair (missing slices)

File: src/application/compaction_service.rsrepair_missing_peer_slices

When it runs: After verified_compact on the same tick, if self_repair_enabled is true and max_repair_checks_per_cycle > 0.

Peer set: Active members only (active_peers), excluding self—unreachable nodes are skipped for that cycle.

Per cycle (bounded work):

  1. Sort peers by node_id for determinism.
  2. For each peer P, GET /internal/sync/manifest once.
  3. Scan manifest entries where:
  4. min_time is below the same age cutoff as verified compaction (verified_compaction_age_secs), and
  5. the row is authored by P (manifest_entry_matches_origin with peer_id == P).
  6. Dedupe keys (P, db, rp, measurement, bucket_start) and collect at most max_repair_checks_per_cycle distinct repair targets.
  7. For each target: compare local vs remote bucket hash; if different (including "empty" vs non-empty), run fetch_and_replace_origin_data toward P.

This discovers work from the author’s manifest even when this node has no local rows for P in that bucket.


6. HTTP: /internal/bucket-hash

File: src/adapters/http/peer_handlers.rshandle_bucket_hash, BucketHashQuery

Query parameters:

Parameter Required Description
db, rp, measurement yes Target series
bucket_nanos yes Bucket start (nanoseconds), same convention as compaction
origin_node_id yes Author node whose files are hashed
bucket_duration_nanos no If omitted or invalid, server uses AppState.compaction_bucket_duration_nanos (from [compaction].bucket_duration)

Clients should send bucket_duration_nanos so behavior stays explicit; the server default keeps older callers working and matches non-1h bucket settings.


7. Configuration

File: src/config.rsCompactionConfig

Key Default Description
verified_compaction_age_secs 3600 Minimum data age before verified compaction / membership repair considers a bucket
bucket_duration "1h" Must match semantics used in bucket hashing ("1h" or "1d" / "24h")
self_repair_enabled true Enable membership-driven repair pass (cluster + compaction + membership bootstrap)
max_repair_checks_per_cycle 128 Max distinct (peer, db, rp, measurement, bucket) hash checks and potential fetches per compaction tick

Environment variables follow HYPERBYTEDB__COMPACTION__SELF_REPAIR_ENABLED, etc.


8. Metrics and logs

Metric Labels Meaning
hyperbytedb_self_repair_hash_checks_total pass = membership Remote hash comparisons scheduled in membership repair
hyperbytedb_self_repair_hash_mismatch_total pass = verified | membership Local vs remote hash differed before fetch
hyperbytedb_self_repair_origin_fetch_total pass = verified | membership Successful fetch_and_replace_origin_data after mismatch
hyperbytedb_self_repair_errors_total reason = manifest_fetch, local_hash, … Failures along the repair path

Existing compaction / verified compaction counters (e.g. hyperbytedb_verified_compaction_*) remain for merge-specific behavior.

Structured logs often include peer, db, rp, measurement, bucket, and hash values on mismatch.


9. Operational guidance

  • Cadence: Repair runs on the compaction interval; it is not a separate timer. Heavy clusters can tune max_repair_checks_per_cycle down to limit HTTP load, or temporarily set self_repair_enabled = false during investigations.
  • Authoritative copy: For a given origin id P, the peer at P’s address in membership is treated as the source for that origin’s slices when hashes disagree.
  • Large gaps: Missing historical data across many buckets may take several cycles to fully converge because of the per-cycle cap.
  • WAL vs Parquet: Open-data alignment is still primarily replication + WAL catch-up; this path heals materialized Parquet and metadata registration.

10. Code map

Component Location
Manifest + origin helpers src/cluster/sync.rs
Bucket hash src/cluster/data_hash.rs
Verified compaction + membership repair + fetch/replace src/application/compaction_service.rs
Bucket-hash HTTP handler src/adapters/http/peer_handlers.rs
App state: bucket duration for hash default src/adapters/http/router.rs (AppState.compaction_bucket_duration_nanos), src/bootstrap.rs
Parquet sync registers origin src/cluster/sync_client.rs

Related design notes: replication design (wire path); deep-dive-compaction.md (merge mechanics).