Deep Dive: Compaction¶
This document provides an exhaustive description of the HyperbyteDB compaction service. It covers trigger conditions, time-bucket grouping, multi-file Parquet merging via Arrow, schema evolution, size-based output splitting, orphan cleanup, and error handling.
Table of Contents¶
- Overview
- Configuration
- Trigger Conditions
- Time Bucket Grouping
- Multi-File Parquet Merge
- Schema Unification and Type Widening
- Batch Conformance
- Sort by Time
- Size-Based Output Splitting
- Output Registration and Cleanup
- Compacted File Naming
- Orphan File Cleanup
- Tombstone Non-Application
- Error Handling and Recovery
- Metrics
- Compaction Lifecycle Summary
- Cluster mode: verification and self-repair
1. Overview¶
The compaction service merges multiple small Parquet files within the same time partition into fewer, larger files. This reduces file count, improves query performance (fewer files for chDB to scan), and produces better-compressed output.
Key design decisions: - Compaction operates entirely via Arrow (no chDB involvement). - Tombstones are NOT applied during compaction -- they remain in metadata for query-time filtering. - Compaction is idempotent -- crashes leave old files intact for retry. - Output files are split when they exceed a target size.
Source modules (src/application/)¶
| File | Role |
|---|---|
compaction_service.rs | Orchestration: CompactionService — background loop (compact()), aggressive compact_all() (with bounded concurrency), per-bucket file grouping, calls into merge + metadata + storage, verified compaction and membership self-repair in cluster mode, orphan Parquet cleanup, metrics (emit_file_count_gauges). |
compaction_merge.rs | Merge engine: merge_parquet_files, streaming merge_parquet_streaming (k-way merge by time over ParquetRecordBatchReader streams), schema merge + conform_batch, merge_to_sorted_batch + write_batch_with_limit as fallback when inputs are not time-sorted per file, and shared helpers such as extract_time_bounds for output metadata. |
Before compaction:
cpu/2024-01-15/12_a1b2c3d4.parquet (1 MB)
cpu/2024-01-15/12_e5f6g7h8.parquet (2 MB)
cpu/2024-01-15/12_i9j0k1l2.parquet (500 KB)
After compaction:
cpu/2024-01-15/12_cm3n4o5p.parquet (3.5 MB, sorted by time)
2. Configuration¶
File: src/config.rs -- CompactionConfig
| Parameter | Default | Description |
|---|---|---|
enabled | true | Enable/disable background compaction |
interval_secs | 30 | How often to check for compaction opportunities |
min_files_to_compact | 2 | Minimum total file count for a measurement before any compaction triggers |
target_file_size_mb | 256 | Approximate target output file size in MB |
bucket_duration | "1h" | Time bucket granularity: "1h" (hourly) or "1d" (daily) |
verified_compaction_age_secs | 3600 | Minimum age before cluster verification / repair considers a time bucket |
self_repair_enabled | true | Enable membership-driven missing-slice repair (cluster bootstrap) |
max_repair_checks_per_cycle | 128 | Cap hash checks and repair attempts per tick |
compact_all_max_inflight | 8 | Max concurrent compact_measurement tasks when compact_all() runs (semaphore) |
Bucket duration¶
The bucket_duration determines how files are grouped for compaction:
| Value | Nanoseconds | Behavior |
|---|---|---|
"1h" | 3,600,000,000,000 | Merge files within the same hour |
"1d" / "24h" | 86,400,000,000,000 | Merge all 24 hourly files into one day-level file |
Daily buckets are useful for workloads with wide time-range queries, as they dramatically reduce the number of files chDB must scan.
3. Trigger Conditions¶
Periodic compaction (compact())¶
The background service runs compact() every interval_secs:
- Iterate all databases, retention policies, and measurements.
- For each measurement, fetch all Parquet files with time ranges via
metadata.get_parquet_files_with_times(). - Skip if total file count <
min_files_to_compact(default 2). This avoids compaction overhead for measurements with few files. - Group files by time bucket.
- Compact only buckets with >= 2 files. Single-file buckets are left untouched.
Aggressive compaction (compact_all())¶
Called via POST /internal/compact or during operational maintenance:
- Same grouping logic but no
min_files_to_compactthreshold. - Every bucket with >= 2 files is compacted.
- Spawns one
tokio::spawntask per(db, rp, measurement), with concurrency capped bycompact_all_max_inflight(default 8) via a semaphore — not unbounded parallelism. - Returns a
CompactSummarywith per-measurement progress.
4. Time Bucket Grouping¶
Files are grouped into time buckets using integer division on min_time:
For example, with hourly buckets (bucket_duration_nanos = 3,600,000,000,000): - A file with min_time = 1705312800000000000 (2024-01-15T10:00:00Z) -> bucket 1705312800000000000 - A file with min_time = 1705313100000000000 (2024-01-15T10:05:00Z) -> same bucket 1705312800000000000
Files in the same bucket are candidates for merging.
5. Multi-File Parquet Merge¶
Module: src/application/compaction_merge.rs
Entry point: merge_parquet_files(storage, paths, target_file_size_bytes) — async; prepares stream sources, then runs CPU-heavy work on the blocking pool.
The merge operates entirely in Arrow (no chDB). There are two paths.
Path A — Streaming k-way merge (default)¶
Used when each input file has rows non-decreasing in time (true for WAL flush output and files already produced by compaction).
- Prepare streams: For each path,
storage.prepare_parquet_stream(path)yields aParquetStreamSource: a local filesystem path, or (for S3) a temp file populated from a full download so Parquet can still be read sequentially from disk. - Open readers: One
ParquetRecordBatchReaderper file (viaParquetRecordBatchReaderBuilder+std::fs::File). - Unified schema: Column union and type widening from each file’s Parquet schema (see Section 6).
- Conform each loaded batch to the unified schema (Section 7).
- K-way merge: A min-heap ordered by normalized
timenanoseconds selects the next row across files; single-row slices are accumulated into batches. Flushes occur when estimated size or row count exceeds thresholds (aligned withtarget_file_size_bytesand an internal row cap). - Write:
write_batch_with_limitemits ZSTD Parquet bytes and time bounds for metadata.
This path avoids loading every file’s bytes into a single Bytes buffer and avoids a full concat + global sort when the sorted-run assumption holds.
Path B — Legacy full sort (fallback)¶
If a batch is not sorted by time or the global merge order would be wrong, the implementation drops stream sources, re-reads read_parquet for each path, then:
- Collect all
RecordBatches from all files. concat_batchesinto one batch.sort_to_indices+takeon thetimecolumn (Section 8).write_batch_with_limitas in Path A.
This matches the older all-in-memory behavior and is correct for arbitrary row order at the cost of higher peak RAM.
CPU isolation¶
merge_parquet_streaming and the legacy sort path run inside tokio::task::spawn_blocking. The async portion only does storage I/O (prepare_parquet_stream, and read_parquet on fallback).
6. Schema Unification and Type Widening¶
Module: compaction_merge.rs — e.g. merge_arrow_schemas / build_unified_schema_from_batches (depending on merge path).
When merging files that were written at different times, their schemas may differ (new tags or fields added over time). The unified schema is the union of all columns from all input schemas or batches.
Algorithm¶
- Walk all fields from all batch schemas.
- Collect into a
BTreeMap<String, (DataType, bool)>for deterministic ordering. - For duplicate column names, apply type widening and merge nullability (
nullable = a.nullable || b.nullable).
Column ordering¶
timeis always first -- if present, it is removed from theBTreeMapand inserted at position 0.- All other columns follow in
BTreeMap(alphabetical) order.
Type widening rules¶
Function: widen_type(existing, incoming) -> DataType
| Existing | Incoming | Result |
|---|---|---|
Int64 | Float64 | Float64 |
Float64 | Int64 | Float64 |
Int64 | UInt64 | Int64 |
UInt64 | Int64 | Int64 |
| anything | same type | no change |
This handles the case where a field was initially written as Integer but later data arrives as Float64. The wider type preserves all values without loss.
7. Batch Conformance¶
Function: conform_batch(batch, target_schema) -> RecordBatch (in compaction_merge.rs)
Each input batch must be conformed to the unified schema before concatenation or k-way merge output assembly.
Algorithm¶
For each field in the target schema:
- Column exists in batch with matching type: Use as-is (
Arc::clone). - Column exists but type differs: Cast via
arrow::compute::cast(). For example,Int64->Float64. - Column does not exist in batch: Create a null array of the target type with
arrow::array::new_null_array(data_type, num_rows).
Error handling¶
Cast failures produce HyperbytedbError::Storage with a descriptive message including the column name, source type, and target type.
8. Sort by Time¶
On the legacy fallback path, after concat_batches, the merged batch is sorted by the time column in ascending order. The streaming k-way path does not perform this global sort; it relies on per-file time order and the heap merge.
Legacy sort implementation:
let indices = sort_to_indices(merged.column(time_idx), Some(sort_options), None)?;
let sorted_columns: Vec<_> = merged.columns()
.iter()
.map(|col| take(col.as_ref(), &indices, None))
.collect()?;
Sort options¶
descending: false-- ascending time ordernulls_first: false-- null timestamps (rare) sort last
This produces a single, time-ordered RecordBatch ready for writing.
9. Size-Based Output Splitting¶
Function: write_batch_with_limit(batch, target_bytes) -> Vec<MergedOutput> (compaction_merge.rs)
When the merged output would exceed target_file_size_bytes, it is split into multiple files.
Algorithm (recursive binary splitting)¶
- If
target_bytes > 0andnum_rows > 1: - Estimate output size:
get_array_memory_size() / 3(Arrow memory size divided by 3 to approximate Parquet compression). - If estimate >
target_bytes:- Split at
num_rows / 2. - Recursively process each half.
- Concatenate results.
- Split at
- Otherwise, write the batch as a single Parquet file.
Output structure¶
Each split produces a MergedOutput:
struct MergedOutput {
data: Vec<u8>, // Parquet bytes
min_time: i64, // Earliest timestamp in this chunk
max_time: i64, // Latest timestamp in this chunk
}
Time bounds extraction¶
extract_time_bounds(batch) reads the first and last non-null values from the time column (leveraging the fact that the batch is already sorted by time).
10. Output Registration and Cleanup¶
Module: compaction_service.rs — register_outputs_and_cleanup() (uses MergedOutput from the merge step).
Phase 1: Write and register new files¶
For each MergedOutput:
- Compute CRC32 checksum.
- Generate a unique compacted path via
make_compacted_path(). - Write to storage:
storage.write_parquet(rel_path, data). - Register in metadata:
metadata.register_parquet_file(db, rp, meas, full_path, min_time, max_time, Some(crc32)).
Phase 2: Remove old files¶
For each input file:
- Unregister from metadata:
metadata.unregister_parquet_file(db, rp, meas, path). - Delete from storage:
storage.delete_parquet(rel_path).
Failure handling¶
- If a new file write fails, the function returns
Ok(false)and the partition is skipped. Old files remain intact. - If an old file deletion fails, it is logged at WARN level but metadata is still unregistered. The orphan cleanup will remove it later.
11. Compacted File Naming¶
Function: make_compacted_path(layout, db, rp, measurement, bucket_nanos) -> String
Compacted files use a distinctive naming convention with a _c prefix on the UUID suffix:
Base: {db}/{rp}/{measurement}/{YYYY-MM-DD}/{HH}.parquet
Compacted: {db}/{rp}/{measurement}/{YYYY-MM-DD}/{HH}_c{8-char-uuid}.parquet
Examples: - mydb/autogen/cpu/2024-01-15/12_ca1b2c3d.parquet - mydb/autogen/cpu/2024-01-15/12_ce5f6g7h.parquet (if size-split)
The _c prefix distinguishes compacted files from flush-created files (which use _{uuid} without the c). This is informational only; the system treats both identically.
12. Orphan File Cleanup¶
Function: cleanup_orphan_files(metadata, layout, db, rp, measurement) -> usize
After compaction, orphan files (files on disk with no metadata entry) are cleaned up.
Algorithm¶
- Build a
HashSet<String>of all known Parquet file paths from metadata. - Walk the measurement directory:
{base_path}/{db}/{rp}/{measurement}/. - For each date subdirectory, scan for
.parquetfiles. - If a file's path is not in the known set, delete it via
std::fs::remove_file. - Attempt to remove empty date directories via
std::fs::remove_dir(non-recursive, fails silently if not empty).
When orphans occur¶
Orphan files can result from: - Crashes between writing a new compacted file and deleting old files. - Manual file placement. - Race conditions between concurrent compaction runs.
Error handling¶
- File deletion errors are silently ignored.
- Directory removal errors are silently ignored.
13. Tombstone Non-Application¶
Important design note: Tombstones (from DELETE statements) are not applied during compaction.
How tombstones work in HyperbyteDB¶
DELETE FROM "cpu" WHERE time < '2024-01-01'stores a tombstone in metadata.- At query time, the query service loads tombstones and injects
AND NOT (predicate)into the generated SQL. - During compaction, files are merged as-is without filtering by tombstone predicates.
Rationale¶
This design keeps compaction simple and idempotent: - No need to parse and apply SQL predicates during compaction. - No risk of data loss from incorrectly applied tombstone logic. - Tombstones are applied consistently at the single query-time boundary.
Tombstone lifecycle¶
Tombstones persist in metadata until manually removed via metadata.remove_tombstone(). The system architecture doc mentions automatic tombstone removal during compaction, but this is not implemented in the current codebase.
14. Error Handling and Recovery¶
Merge failure¶
If merge_parquet_files() fails for a partition: - The error is logged at WARN level. - The partition is skipped (old files remain intact). - Other partitions continue processing. - The next compaction cycle will retry.
Write failure¶
If storage.write_parquet() fails for a compacted output: - register_outputs_and_cleanup() returns Ok(false). - The partition is skipped entirely. - No metadata changes are made. - Old files remain intact.
Metadata failure¶
If metadata.register_parquet_file() or metadata.unregister_parquet_file() fails: - The error propagates via ?. - The compaction run stops for this measurement. - Partial state may exist (some files registered, some not). - The orphan cleanup will eventually remove unreferenced files.
Task panic¶
If a spawn_blocking task panics: - The error is wrapped in HyperbytedbError::Internal("compaction task panicked: ..."). - The compaction run fails for the affected measurement.
Idempotency¶
Compaction is safe to retry after any failure: - New files are always written with unique UUID-suffixed names. - Old files are only deleted after new files are written and registered. - Orphan cleanup handles any leftover files from partial runs.
15. Metrics¶
| Metric | Type | Labels | Description |
|---|---|---|---|
hyperbytedb_compaction_runs_total | counter | -- | Compaction cycles completed |
hyperbytedb_compaction_errors_total | counter | -- | Failed compaction cycles |
hyperbytedb_compaction_duration_seconds | histogram | -- | Compaction cycle duration |
hyperbytedb_compaction_files_merged_total | counter | -- | Input files merged (count of files consumed) |
hyperbytedb_compaction_streaming_merges_total | counter | -- | Streaming k-way merges completed successfully (not emitted on legacy fallback) |
hyperbytedb_parquet_files_count | gauge | db, rp, measurement | Current file count per measurement |
The hyperbytedb_parquet_files_count gauge is updated after every compaction run via emit_file_count_gauges(), providing per-measurement visibility into file counts.
Cluster repair (additional): hyperbytedb_self_repair_hash_checks_total, hyperbytedb_self_repair_hash_mismatch_total, hyperbytedb_self_repair_origin_fetch_total, hyperbytedb_self_repair_errors_total — see Deep Dive: Self-Repair.
16. Compaction Lifecycle Summary¶
1. Trigger: periodic tick (compact) or `POST /internal/compact` (compact_all)
|
2. List databases -> retention policies -> measurements
|
3. For each measurement:
|
+-- Fetch all Parquet files with time ranges from metadata
|
+-- Skip if total files < min_files_to_compact (periodic only)
|
+-- Group files by time bucket:
| bucket = (min_time / bucket_duration_nanos) * bucket_duration_nanos
|
+-- For each bucket with >= 2 files:
| |
| +-- prepare_parquet_stream / merge (see compaction_merge.rs)
| |
| +-- [spawn_blocking] Merge pipeline (compaction_merge.rs):
| | |
| | +-- Build unified schema (union of columns)
| | +-- Conform batches to unified schema
| | +-- Path A: k-way merge by time, accumulate rows, write batches
| | +-- Path B (fallback): concat all batches, sort by time, write batches
| | +-- Split if > target_file_size_bytes (write_batch_with_limit)
| | +-- Write each output as Parquet (ZSTD, row groups)
| |
| +-- For each output:
| | +-- Compute CRC32
| | +-- Write to storage with _c{uuid} name
| | +-- Register in metadata
| |
| +-- For each input:
| +-- Unregister from metadata
| +-- Delete from storage
|
+-- Run orphan cleanup:
+-- List files on disk not in metadata
+-- Delete orphans
+-- Remove empty directories
|
4. Emit metrics and gauges
17. Cluster mode: verification and self-repair¶
When the process is bootstrapped with cluster membership and compaction enabled (src/bootstrap.rs), each compaction tick can run two extra passes after compact():
- Verified compaction — For time buckets older than
verified_compaction_age_secs, compare per-origin content hashes with the authoring peer for slices already present locally; on mismatch, replace files from the peer’s sync manifest (origin-filtered rows). - Membership-driven repair — If
self_repair_enabled, fetch each active peer’s sync manifest (up tomax_repair_checks_per_cycledistinct repair targets per tick), hash-compare buckets where that peer authored cold data, and fetch missing or divergent slices.
Sync manifests expose origin_node_id per Parquet row for precise filtering; older peers omit it and repair falls back to _n{id}_ path tags.
Full protocol, HTTP API, and operations: Deep Dive: Self-Repair.