Skip to content

Deep Dive: Compaction

This document provides an exhaustive description of the HyperbyteDB compaction service. It covers trigger conditions, time-bucket grouping, multi-file Parquet merging via Arrow, schema evolution, size-based output splitting, orphan cleanup, and error handling.


Table of Contents

  1. Overview
  2. Configuration
  3. Trigger Conditions
  4. Time Bucket Grouping
  5. Multi-File Parquet Merge
  6. Schema Unification and Type Widening
  7. Batch Conformance
  8. Sort by Time
  9. Size-Based Output Splitting
  10. Output Registration and Cleanup
  11. Compacted File Naming
  12. Orphan File Cleanup
  13. Tombstone Non-Application
  14. Error Handling and Recovery
  15. Metrics
  16. Compaction Lifecycle Summary
  17. Cluster mode: verification and self-repair

1. Overview

The compaction service merges multiple small Parquet files within the same time partition into fewer, larger files. This reduces file count, improves query performance (fewer files for chDB to scan), and produces better-compressed output.

Key design decisions: - Compaction operates entirely via Arrow (no chDB involvement). - Tombstones are NOT applied during compaction -- they remain in metadata for query-time filtering. - Compaction is idempotent -- crashes leave old files intact for retry. - Output files are split when they exceed a target size.

Source modules (src/application/)

File Role
compaction_service.rs Orchestration: CompactionService — background loop (compact()), aggressive compact_all() (with bounded concurrency), per-bucket file grouping, calls into merge + metadata + storage, verified compaction and membership self-repair in cluster mode, orphan Parquet cleanup, metrics (emit_file_count_gauges).
compaction_merge.rs Merge engine: merge_parquet_files, streaming merge_parquet_streaming (k-way merge by time over ParquetRecordBatchReader streams), schema merge + conform_batch, merge_to_sorted_batch + write_batch_with_limit as fallback when inputs are not time-sorted per file, and shared helpers such as extract_time_bounds for output metadata.
Before compaction:
  cpu/2024-01-15/12_a1b2c3d4.parquet  (1 MB)
  cpu/2024-01-15/12_e5f6g7h8.parquet  (2 MB)
  cpu/2024-01-15/12_i9j0k1l2.parquet  (500 KB)

After compaction:
  cpu/2024-01-15/12_cm3n4o5p.parquet  (3.5 MB, sorted by time)

2. Configuration

File: src/config.rs -- CompactionConfig

Parameter Default Description
enabled true Enable/disable background compaction
interval_secs 30 How often to check for compaction opportunities
min_files_to_compact 2 Minimum total file count for a measurement before any compaction triggers
target_file_size_mb 256 Approximate target output file size in MB
bucket_duration "1h" Time bucket granularity: "1h" (hourly) or "1d" (daily)
verified_compaction_age_secs 3600 Minimum age before cluster verification / repair considers a time bucket
self_repair_enabled true Enable membership-driven missing-slice repair (cluster bootstrap)
max_repair_checks_per_cycle 128 Cap hash checks and repair attempts per tick
compact_all_max_inflight 8 Max concurrent compact_measurement tasks when compact_all() runs (semaphore)

Bucket duration

The bucket_duration determines how files are grouped for compaction:

Value Nanoseconds Behavior
"1h" 3,600,000,000,000 Merge files within the same hour
"1d" / "24h" 86,400,000,000,000 Merge all 24 hourly files into one day-level file

Daily buckets are useful for workloads with wide time-range queries, as they dramatically reduce the number of files chDB must scan.


3. Trigger Conditions

Periodic compaction (compact())

The background service runs compact() every interval_secs:

  1. Iterate all databases, retention policies, and measurements.
  2. For each measurement, fetch all Parquet files with time ranges via metadata.get_parquet_files_with_times().
  3. Skip if total file count < min_files_to_compact (default 2). This avoids compaction overhead for measurements with few files.
  4. Group files by time bucket.
  5. Compact only buckets with >= 2 files. Single-file buckets are left untouched.

Aggressive compaction (compact_all())

Called via POST /internal/compact or during operational maintenance:

  1. Same grouping logic but no min_files_to_compact threshold.
  2. Every bucket with >= 2 files is compacted.
  3. Spawns one tokio::spawn task per (db, rp, measurement), with concurrency capped by compact_all_max_inflight (default 8) via a semaphore — not unbounded parallelism.
  4. Returns a CompactSummary with per-measurement progress.

4. Time Bucket Grouping

Files are grouped into time buckets using integer division on min_time:

bucket = (min_time / bucket_duration_nanos) * bucket_duration_nanos

For example, with hourly buckets (bucket_duration_nanos = 3,600,000,000,000): - A file with min_time = 1705312800000000000 (2024-01-15T10:00:00Z) -> bucket 1705312800000000000 - A file with min_time = 1705313100000000000 (2024-01-15T10:05:00Z) -> same bucket 1705312800000000000

Files in the same bucket are candidates for merging.


5. Multi-File Parquet Merge

Module: src/application/compaction_merge.rs

Entry point: merge_parquet_files(storage, paths, target_file_size_bytes) — async; prepares stream sources, then runs CPU-heavy work on the blocking pool.

The merge operates entirely in Arrow (no chDB). There are two paths.

Path A — Streaming k-way merge (default)

Used when each input file has rows non-decreasing in time (true for WAL flush output and files already produced by compaction).

  1. Prepare streams: For each path, storage.prepare_parquet_stream(path) yields a ParquetStreamSource: a local filesystem path, or (for S3) a temp file populated from a full download so Parquet can still be read sequentially from disk.
  2. Open readers: One ParquetRecordBatchReader per file (via ParquetRecordBatchReaderBuilder + std::fs::File).
  3. Unified schema: Column union and type widening from each file’s Parquet schema (see Section 6).
  4. Conform each loaded batch to the unified schema (Section 7).
  5. K-way merge: A min-heap ordered by normalized time nanoseconds selects the next row across files; single-row slices are accumulated into batches. Flushes occur when estimated size or row count exceeds thresholds (aligned with target_file_size_bytes and an internal row cap).
  6. Write: write_batch_with_limit emits ZSTD Parquet bytes and time bounds for metadata.

This path avoids loading every file’s bytes into a single Bytes buffer and avoids a full concat + global sort when the sorted-run assumption holds.

Path B — Legacy full sort (fallback)

If a batch is not sorted by time or the global merge order would be wrong, the implementation drops stream sources, re-reads read_parquet for each path, then:

  1. Collect all RecordBatches from all files.
  2. concat_batches into one batch.
  3. sort_to_indices + take on the time column (Section 8).
  4. write_batch_with_limit as in Path A.

This matches the older all-in-memory behavior and is correct for arbitrary row order at the cost of higher peak RAM.

CPU isolation

merge_parquet_streaming and the legacy sort path run inside tokio::task::spawn_blocking. The async portion only does storage I/O (prepare_parquet_stream, and read_parquet on fallback).


6. Schema Unification and Type Widening

Module: compaction_merge.rs — e.g. merge_arrow_schemas / build_unified_schema_from_batches (depending on merge path).

When merging files that were written at different times, their schemas may differ (new tags or fields added over time). The unified schema is the union of all columns from all input schemas or batches.

Algorithm

  1. Walk all fields from all batch schemas.
  2. Collect into a BTreeMap<String, (DataType, bool)> for deterministic ordering.
  3. For duplicate column names, apply type widening and merge nullability (nullable = a.nullable || b.nullable).

Column ordering

  • time is always first -- if present, it is removed from the BTreeMap and inserted at position 0.
  • All other columns follow in BTreeMap (alphabetical) order.

Type widening rules

Function: widen_type(existing, incoming) -> DataType

Existing Incoming Result
Int64 Float64 Float64
Float64 Int64 Float64
Int64 UInt64 Int64
UInt64 Int64 Int64
anything same type no change

This handles the case where a field was initially written as Integer but later data arrives as Float64. The wider type preserves all values without loss.


7. Batch Conformance

Function: conform_batch(batch, target_schema) -> RecordBatch (in compaction_merge.rs)

Each input batch must be conformed to the unified schema before concatenation or k-way merge output assembly.

Algorithm

For each field in the target schema:

  1. Column exists in batch with matching type: Use as-is (Arc::clone).
  2. Column exists but type differs: Cast via arrow::compute::cast(). For example, Int64 -> Float64.
  3. Column does not exist in batch: Create a null array of the target type with arrow::array::new_null_array(data_type, num_rows).

Error handling

Cast failures produce HyperbytedbError::Storage with a descriptive message including the column name, source type, and target type.


8. Sort by Time

On the legacy fallback path, after concat_batches, the merged batch is sorted by the time column in ascending order. The streaming k-way path does not perform this global sort; it relies on per-file time order and the heap merge.

Legacy sort implementation:

let indices = sort_to_indices(merged.column(time_idx), Some(sort_options), None)?;
let sorted_columns: Vec<_> = merged.columns()
    .iter()
    .map(|col| take(col.as_ref(), &indices, None))
    .collect()?;

Sort options

  • descending: false -- ascending time order
  • nulls_first: false -- null timestamps (rare) sort last

This produces a single, time-ordered RecordBatch ready for writing.


9. Size-Based Output Splitting

Function: write_batch_with_limit(batch, target_bytes) -> Vec<MergedOutput> (compaction_merge.rs)

When the merged output would exceed target_file_size_bytes, it is split into multiple files.

Algorithm (recursive binary splitting)

  1. If target_bytes > 0 and num_rows > 1:
  2. Estimate output size: get_array_memory_size() / 3 (Arrow memory size divided by 3 to approximate Parquet compression).
  3. If estimate > target_bytes:
    • Split at num_rows / 2.
    • Recursively process each half.
    • Concatenate results.
  4. Otherwise, write the batch as a single Parquet file.

Output structure

Each split produces a MergedOutput:

struct MergedOutput {
    data: Vec<u8>,      // Parquet bytes
    min_time: i64,      // Earliest timestamp in this chunk
    max_time: i64,      // Latest timestamp in this chunk
}

Time bounds extraction

extract_time_bounds(batch) reads the first and last non-null values from the time column (leveraging the fact that the batch is already sorted by time).


10. Output Registration and Cleanup

Module: compaction_service.rsregister_outputs_and_cleanup() (uses MergedOutput from the merge step).

Phase 1: Write and register new files

For each MergedOutput:

  1. Compute CRC32 checksum.
  2. Generate a unique compacted path via make_compacted_path().
  3. Write to storage: storage.write_parquet(rel_path, data).
  4. Register in metadata: metadata.register_parquet_file(db, rp, meas, full_path, min_time, max_time, Some(crc32)).

Phase 2: Remove old files

For each input file:

  1. Unregister from metadata: metadata.unregister_parquet_file(db, rp, meas, path).
  2. Delete from storage: storage.delete_parquet(rel_path).

Failure handling

  • If a new file write fails, the function returns Ok(false) and the partition is skipped. Old files remain intact.
  • If an old file deletion fails, it is logged at WARN level but metadata is still unregistered. The orphan cleanup will remove it later.

11. Compacted File Naming

Function: make_compacted_path(layout, db, rp, measurement, bucket_nanos) -> String

Compacted files use a distinctive naming convention with a _c prefix on the UUID suffix:

Base:      {db}/{rp}/{measurement}/{YYYY-MM-DD}/{HH}.parquet
Compacted: {db}/{rp}/{measurement}/{YYYY-MM-DD}/{HH}_c{8-char-uuid}.parquet

Examples: - mydb/autogen/cpu/2024-01-15/12_ca1b2c3d.parquet - mydb/autogen/cpu/2024-01-15/12_ce5f6g7h.parquet (if size-split)

The _c prefix distinguishes compacted files from flush-created files (which use _{uuid} without the c). This is informational only; the system treats both identically.


12. Orphan File Cleanup

Function: cleanup_orphan_files(metadata, layout, db, rp, measurement) -> usize

After compaction, orphan files (files on disk with no metadata entry) are cleaned up.

Algorithm

  1. Build a HashSet<String> of all known Parquet file paths from metadata.
  2. Walk the measurement directory: {base_path}/{db}/{rp}/{measurement}/.
  3. For each date subdirectory, scan for .parquet files.
  4. If a file's path is not in the known set, delete it via std::fs::remove_file.
  5. Attempt to remove empty date directories via std::fs::remove_dir (non-recursive, fails silently if not empty).

When orphans occur

Orphan files can result from: - Crashes between writing a new compacted file and deleting old files. - Manual file placement. - Race conditions between concurrent compaction runs.

Error handling

  • File deletion errors are silently ignored.
  • Directory removal errors are silently ignored.

13. Tombstone Non-Application

Important design note: Tombstones (from DELETE statements) are not applied during compaction.

How tombstones work in HyperbyteDB

  1. DELETE FROM "cpu" WHERE time < '2024-01-01' stores a tombstone in metadata.
  2. At query time, the query service loads tombstones and injects AND NOT (predicate) into the generated SQL.
  3. During compaction, files are merged as-is without filtering by tombstone predicates.

Rationale

This design keeps compaction simple and idempotent: - No need to parse and apply SQL predicates during compaction. - No risk of data loss from incorrectly applied tombstone logic. - Tombstones are applied consistently at the single query-time boundary.

Tombstone lifecycle

Tombstones persist in metadata until manually removed via metadata.remove_tombstone(). The system architecture doc mentions automatic tombstone removal during compaction, but this is not implemented in the current codebase.


14. Error Handling and Recovery

Merge failure

If merge_parquet_files() fails for a partition: - The error is logged at WARN level. - The partition is skipped (old files remain intact). - Other partitions continue processing. - The next compaction cycle will retry.

Write failure

If storage.write_parquet() fails for a compacted output: - register_outputs_and_cleanup() returns Ok(false). - The partition is skipped entirely. - No metadata changes are made. - Old files remain intact.

Metadata failure

If metadata.register_parquet_file() or metadata.unregister_parquet_file() fails: - The error propagates via ?. - The compaction run stops for this measurement. - Partial state may exist (some files registered, some not). - The orphan cleanup will eventually remove unreferenced files.

Task panic

If a spawn_blocking task panics: - The error is wrapped in HyperbytedbError::Internal("compaction task panicked: ..."). - The compaction run fails for the affected measurement.

Idempotency

Compaction is safe to retry after any failure: - New files are always written with unique UUID-suffixed names. - Old files are only deleted after new files are written and registered. - Orphan cleanup handles any leftover files from partial runs.


15. Metrics

Metric Type Labels Description
hyperbytedb_compaction_runs_total counter -- Compaction cycles completed
hyperbytedb_compaction_errors_total counter -- Failed compaction cycles
hyperbytedb_compaction_duration_seconds histogram -- Compaction cycle duration
hyperbytedb_compaction_files_merged_total counter -- Input files merged (count of files consumed)
hyperbytedb_compaction_streaming_merges_total counter -- Streaming k-way merges completed successfully (not emitted on legacy fallback)
hyperbytedb_parquet_files_count gauge db, rp, measurement Current file count per measurement

The hyperbytedb_parquet_files_count gauge is updated after every compaction run via emit_file_count_gauges(), providing per-measurement visibility into file counts.

Cluster repair (additional): hyperbytedb_self_repair_hash_checks_total, hyperbytedb_self_repair_hash_mismatch_total, hyperbytedb_self_repair_origin_fetch_total, hyperbytedb_self_repair_errors_total — see Deep Dive: Self-Repair.


16. Compaction Lifecycle Summary

1. Trigger: periodic tick (compact) or `POST /internal/compact` (compact_all)
   |
2. List databases -> retention policies -> measurements
   |
3. For each measurement:
   |
   +-- Fetch all Parquet files with time ranges from metadata
   |
   +-- Skip if total files < min_files_to_compact (periodic only)
   |
   +-- Group files by time bucket:
   |     bucket = (min_time / bucket_duration_nanos) * bucket_duration_nanos
   |
   +-- For each bucket with >= 2 files:
   |     |
   |     +-- prepare_parquet_stream / merge (see compaction_merge.rs)
   |     |
   |     +-- [spawn_blocking] Merge pipeline (compaction_merge.rs):
   |     |     |
   |     |     +-- Build unified schema (union of columns)
   |     |     +-- Conform batches to unified schema
   |     |     +-- Path A: k-way merge by time, accumulate rows, write batches
   |     |     +-- Path B (fallback): concat all batches, sort by time, write batches
   |     |     +-- Split if > target_file_size_bytes (write_batch_with_limit)
   |     |     +-- Write each output as Parquet (ZSTD, row groups)
   |     |
   |     +-- For each output:
   |     |     +-- Compute CRC32
   |     |     +-- Write to storage with _c{uuid} name
   |     |     +-- Register in metadata
   |     |
   |     +-- For each input:
   |           +-- Unregister from metadata
   |           +-- Delete from storage
   |
   +-- Run orphan cleanup:
         +-- List files on disk not in metadata
         +-- Delete orphans
         +-- Remove empty directories
   |
4. Emit metrics and gauges

17. Cluster mode: verification and self-repair

When the process is bootstrapped with cluster membership and compaction enabled (src/bootstrap.rs), each compaction tick can run two extra passes after compact():

  1. Verified compaction — For time buckets older than verified_compaction_age_secs, compare per-origin content hashes with the authoring peer for slices already present locally; on mismatch, replace files from the peer’s sync manifest (origin-filtered rows).
  2. Membership-driven repair — If self_repair_enabled, fetch each active peer’s sync manifest (up to max_repair_checks_per_cycle distinct repair targets per tick), hash-compare buckets where that peer authored cold data, and fetch missing or divergent slices.

Sync manifests expose origin_node_id per Parquet row for precise filtering; older peers omit it and repair falls back to _n{id}_ path tags.

Full protocol, HTTP API, and operations: Deep Dive: Self-Repair.