Deep Dive: Write Path¶
This document traces every step of the HyperbyteDB write path, from the moment an HTTP request arrives to the point where data is persisted as a Parquet file on disk (or S3). It covers line protocol ingestion, WAL durability, the background flush pipeline, Arrow schema construction, Parquet encoding, and cluster replication.
Table of Contents¶
- Overview
- HTTP Write Handler
- Line Protocol Parsing
- Metadata Registration
- WAL Append
- Flush Pipeline
- Arrow Schema Construction
- RecordBatch Building
- Parquet File Writing
- Storage Path Layout
- File Registration
- WAL Truncation
- Cluster Write Replication
- Metrics
1. Overview¶
The write path is split into two phases:
Phase 1 -- Synchronous (client-blocking): HTTP request handling, line protocol parsing, metadata registration, and WAL append. The client receives a 204 No Content response as soon as the WAL append completes. Data is durable at this point.
Phase 2 -- Asynchronous (background): The flush service periodically reads WAL entries, converts them to Arrow RecordBatches, writes Parquet files, registers them in metadata, and truncates the WAL.
Client POST /write
|
v
+-------------------------------+
| HTTP Handler (write.rs) |
| - Validate params |
| - Decompress gzip |
| - Cluster state check |
+-------------------------------+
|
v
+-------------------------------+
| Ingestion Service |
| - Parse line protocol |
| - Register metadata |
| - Append to WAL |
+-------------------------------+
|
v (204 returned to client)
|
v (background, every N seconds)
+-------------------------------+
| Flush Service |
| - Read WAL entries |
| - Group by (db, rp, meas) |
| - Partition by hour |
| - Convert to Arrow |
| - Write Parquet files |
| - Register in metadata |
| - Truncate WAL |
+-------------------------------+
|
v
+-------------------------------+
| Storage Backend |
| LocalStorage or S3Storage |
+-------------------------------+
2. HTTP Write Handler¶
File: src/adapters/http/write.rs
Entry point¶
handle_write() handles POST /write requests.
Query parameters¶
| Parameter | Required | Default | Description |
|---|---|---|---|
db | Yes | -- | Target database name |
rp | No | Default RP | Retention policy name |
precision | No | ns | Timestamp precision: ns, us/u, ms, s |
u | No | -- | Username (when auth enabled) |
p | No | -- | Password (when auth enabled) |
Processing steps¶
- Cluster state check -- In cluster mode, reject writes when the node's state is
Draining,Leaving,Syncing, orJoining. Returns503 Service Unavailablewith an optionalX-Hyperbytedb-Redirectheader pointing to an active peer. - Validate
db-- If missing, returns400with"database is required". - Gzip decompression -- If
Content-Encoding: gzip, the body is decompressed viaflate2::read::GzDecoder. This supports compressed payloads from Telegraf and other agents. - Delegate to
IngestionPort.ingest()-- Passesdb,rp,precision, and the raw body bytes.
Metrics emitted¶
hyperbytedb_write_payload_bytes-- histogram of raw payload sizehyperbytedb_write_duration_seconds-- histogram of handler latencyhyperbytedb_write_requests_total-- counterhyperbytedb_write_errors_total-- counter (on failure)
3. Line Protocol Parsing¶
File: src/application/ingestion_service.rs
Overview¶
The IngestionServiceImpl::ingest() method parses the body as InfluxDB line protocol and converts each line into a domain Point.
Line protocol format¶
Parsing steps¶
- UTF-8 validation -- The body bytes are converted to
&str. influxdb_line_protocol::parse_lines(input)-- Uses theinfluxdb-line-protocolcrate to produceParsedLinestructs.parsed_line_to_point()-- Converts eachParsedLineto a domainPoint:- Measurement from
series.measurement. - Tags from the tag set (sorted
BTreeMap<String, String>). - Fields via
lp_field_value_to_domain():LpFieldValue::F64->FieldValue::Float(f64)LpFieldValue::I64->FieldValue::Integer(i64)LpFieldValue::U64->FieldValue::UInteger(u64)LpFieldValue::String->FieldValue::String(String)LpFieldValue::Boolean->FieldValue::Boolean(bool)
- Timestamp -- If present, scaled to nanoseconds based on
precision. If absent, defaults tochrono::Utc::now().timestamp_nanos().
Precision scaling¶
| Precision | Multiplier |
|---|---|
s | x 1,000,000,000 |
ms | x 1,000,000 |
us / u | x 1,000 |
ns (default) | x 1 |
Error handling¶
A parse error on any line produces HyperbytedbError::LineProtocolParse { line, reason }, which maps to HTTP 400.
4. Metadata Registration¶
File: src/application/ingestion_service.rs
Before appending to the WAL, the ingestion service registers schema information in the metadata store.
Steps¶
- Resolve retention policy -- If
rpis empty, resolve to the database's default retention policy (typicallyautogen). - Verify database exists --
metadata.get_database(db)-- returnsHyperbytedbError::DatabaseNotFoundif missing. - Group points by measurement -- Collect
field_types(name -> discriminant) andtag_keysper measurement. - Check field type compatibility --
metadata.check_field_types(db, measurement, &field_types)-- if a field was previously registered asFloat64and a write sendsInteger, returnsHyperbytedbError::FieldTypeConflict. - Cardinality limits -- Check
max_measurements_per_databaseandmax_tag_values_per_measurement. ReturnsHyperbytedbError::CardinalityExceededif either limit is hit. - Register measurement --
metadata.register_measurement(db, measurement, field_types, tag_keys)-- upsertsMeasurementMetain the metadata store. - Store tag values --
metadata.store_tag_value(db, measurement, tag_key, tag_value)for each unique tag value. These are used bySHOW TAG VALUESqueries.
MeasurementMeta¶
struct MeasurementMeta {
name: String,
field_types: HashMap<String, u8>, // field_name -> FieldValue discriminant
tag_keys: Vec<String>,
}
Stored in RocksDB under key meas:{db}:{name} as JSON.
5. WAL Append¶
Files: src/ports/wal.rs, src/adapters/wal/rocksdb_wal.rs
WalEntry structure¶
All points from a single write request are grouped into one WalEntry.
RocksDB WAL implementation¶
The WAL uses RocksDB with two column families:
| Column Family | Purpose |
|---|---|
wal | Ordered entries. Keys are big-endian u64 sequence numbers. Values are bincode-serialized WalEntry. |
wal_meta | Single key last_seq storing the current sequence as big-endian u64. |
Append operation¶
fetch_addon anAtomicU64to get the next sequence number.bincode::serialize(&entry)theWalEntry.- Create a
WriteBatchwith: put_cf("wal", big_endian(seq), serialized_entry)put_cf("wal_meta", "last_seq", big_endian(seq))db.write(batch)-- atomic, durable write.- Return the sequence number.
Key encoding¶
Sequence numbers are stored as big-endian u64 (8 bytes). This preserves numerical ordering in RocksDB's lexicographic key space, enabling efficient forward iteration and range deletion.
Durability¶
The write is durable the moment db.write(batch) returns. RocksDB's WAL (not to be confused with HyperbyteDB's WAL concept) ensures fsync semantics.
6. Flush Pipeline¶
File: src/application/flush_service.rs
The flush service is the bridge between the WAL and Parquet storage. It runs as a background Tokio task.
Constants¶
| Constant | Value | Purpose |
|---|---|---|
ESTIMATED_BYTES_PER_POINT | 512 | Memory budget calculation |
WAL_READ_CHUNK | 5,000 | Max WAL entries per read batch |
MIN_BATCH_POINTS | 10,000 | Minimum points per Parquet batch |
MAX_BATCH_POINTS | 500,000 | Maximum points per Parquet batch |
MAX_WAL_RETENTION_ENTRIES | 500,000 | WAL cap when peers are lagging |
Lifecycle¶
- Timer tick every
flush.interval_secs(default 10s). - On tick, call
flush(). - On shutdown signal, exit the loop. The drain service calls
drain()for a final full flush.
flush() step by step¶
Step 1 -- Read WAL entries:
Reads up to 5,000 entries from the WAL starting after the last flushed sequence. If empty, the flush is a no-op. If there are more than 5,000 entries, the outer loop reads successive chunks.
Step 2 -- Group by (database, retention_policy, measurement):
Points are extracted from all entries and grouped into a BTreeMap<(db, rp, measurement), Vec<Point>>.
Step 3 -- Sort, sub-batch, and partition by hour:
For each measurement group: 1. Sort all points by timestamp. 2. Split into sub-batches of max_points_per_batch. 3. Within each sub-batch, call partition_by_hour() to bucket points by hour:
Vec<(hour_ts, Vec<Point>)>. Each (db, rp, measurement, hour_ts, points) tuple becomes a FlushWork item.
Step 4 -- Phase 1: Parquet conversion (CPU-bound):
Each FlushWork is processed in tokio::task::spawn_blocking: 1. parquet_writer::points_to_parquet(&points) -- converts to Parquet bytes. 2. make_unique_path() -- appends UUID suffix to avoid file collisions. 3. Compute min_time and max_time from point timestamps. 4. Compute CRC32 checksum of the Parquet bytes.
All conversion tasks run in parallel on the blocking threadpool.
Step 5 -- Phase 2: Write and register (I/O-bound):
Each FlushResult is processed in tokio::spawn: 1. storage.write_parquet(rel_path, data) -- writes to local FS or S3. 2. metadata.register_parquet_file(db, rp, measurement, full_path, min_time, max_time, Some(crc32)) -- registers in the Parquet file registry.
All write tasks run in parallel.
Step 6 -- Truncate WAL:
After all writes complete: 1. Compute safe truncation point via compute_safe_truncate_seq(). 2. wal.truncate_before(safe_seq + 1) -- deletes processed entries. 3. Update last_flushed cursor.
Memory-aware batch sizing¶
When max_points_per_batch = 0 (default), auto_detect_batch_size() is called:
- On Linux: reads
/proc/meminfoforMemAvailable, uses 25% of available memory. - Elsewhere: defaults to 1 GB.
- Budget in points:
budget / (ESTIMATED_BYTES_PER_POINT * 3)-- the factor of 3 accounts for the point itself, the Arrow arrays, and the Parquet output buffer. - Clamped to
[10,000 .. 500,000].
Parallelism model¶
WAL entries
|
v
[Sequential] Group, sort, partition
|
v
[Parallel spawn_blocking] Parquet conversion (CPU)
| | | ...
v v v
[Wait all] Collect results
|
v
[Parallel tokio::spawn] Write + register (I/O)
| | | ...
v v v
[Wait all] Confirm writes
|
v
[Sequential] Truncate WAL
7. Arrow Schema Construction¶
File: src/adapters/storage/parquet_writer.rs
build_schema(tag_keys, field_types) -> Schema¶
Constructs an Arrow Schema with a deterministic column order:
| Position | Column | Arrow Type | Nullable |
|---|---|---|---|
| 0 | time | Timestamp(Nanosecond, Some("UTC")) | No |
| 1..N | Tag columns (sorted by key) | Utf8 | Yes |
| N+1..M | Field columns (sorted by key) | Varies by discriminant | Yes |
Field type mapping¶
| FieldValue Discriminant | Arrow DataType |
|---|---|
0 (Float) | Float64 |
1 (Integer) | Int64 |
2 (UInteger) | UInt64 |
3 (String) | Utf8 |
4 (Boolean) | Boolean |
| other (fallback) | Float64 |
Schema discovery in points_to_parquet()¶
When converting a batch of points (used during flush), points_to_parquet() discovers the schema dynamically:
- Walk all points and collect tag keys into a
BTreeSet<String>(sorted, deduplicated). - Walk all points and collect field types into a
BTreeMap<String, u8>. - Type conflict resolution: If the same field name appears with both
Integer(1) andFloat64(0) discriminants across different points, the wider typeFloat64(0) wins.
8. RecordBatch Building¶
File: src/adapters/storage/parquet_writer.rs
points_to_record_batch(points, schema, tag_keys, field_types) -> RecordBatch¶
Converts a slice of Point structs into an Arrow RecordBatch.
Algorithm¶
- Pre-allocate builders with
Vec::with_capacity(n)for each column. - Iterate points: For each point:
- Append
timestampto theTimestampNanosecondArraybuilder. - For each tag key: if the point has that tag, push
Some(value), elseNone. - For each field key: if the point has that field, push
Some(FieldValue), elseNone. - Build typed arrays:
- Timestamps:
TimestampNanosecondArraywith UTC timezone. - Tags:
StringBuilder->StringArray(nullable). - Fields: Type-specific arrays:
Float64Arrayfor discriminant 0Int64Arrayfor discriminant 1UInt64Arrayfor discriminant 2StringArrayfor discriminant 3BooleanArrayfor discriminant 4
- Assemble columns:
[timestamps, ...tag_arrays, ...field_arrays]. - Create
RecordBatch:RecordBatch::try_new(schema, columns).
Null handling¶
Missing tags and fields are represented as nulls in the Arrow arrays. This is the correct behavior for time-series data where not every point has every tag or field.
Error handling¶
- Empty points slice returns
HyperbytedbError::Storage("cannot create record batch from empty points"). - Schema mismatch in
RecordBatch::try_newreturnsHyperbytedbError::Storage.
9. Parquet File Writing¶
File: src/adapters/storage/parquet_writer.rs
write_parquet(batch) -> Vec<u8>¶
Serializes a RecordBatch to Parquet bytes in memory.
Writer properties¶
| Property | Value | Rationale |
|---|---|---|
| Compression | ZSTD level 1 | Good compression ratio with fast decompression |
| Max row group size | 65,536 rows | Balances memory usage vs query granularity |
| Statistics | Page-level (EnabledStatistics::Page) | Enables predicate pushdown in chDB |
Process¶
- Create
WriterPropertieswith the above settings. - Create
ArrowWriterwriting to aVec<u8>buffer. writer.write(batch)-- encodes the RecordBatch.writer.close()-- flushes and finalizes the Parquet footer.- Return the buffer.
End-to-end: points_to_parquet()¶
Combines schema discovery, RecordBatch building, and Parquet writing into a single call:
Returns Bytes::new() (empty) for an empty input slice, and Bytes::from(buf) otherwise.
10. Storage Path Layout¶
File: src/domain/storage_layout.rs
Path format¶
Example: mydb/autogen/cpu/2024-01-15/12.parquet
Unique path generation¶
make_unique_path() appends an 8-character UUID suffix before .parquet to prevent overwrites when multiple flushes write to the same hour bucket:
Input: mydb/autogen/cpu/2024-01-15/12.parquet
Output: mydb/autogen/cpu/2024-01-15/12_a3f8b1c2.parquet
StorageLayout methods¶
| Method | Returns | Purpose |
|---|---|---|
parquet_path(db, rp, meas, ts_nanos) | Absolute path | Full path with base_path prefix |
parquet_path_relative(db, rp, meas, ts_nanos) | Relative path | Used by storage adapters |
glob_pattern(db, rp, meas, min_time, max_time) | Glob string | For chDB file() queries |
parquet_glob(db, rp, meas) | Glob string | All Parquet files for a measurement |
Storage backends¶
Both LocalStorage and S3Storage implement the StoragePort trait:
#[async_trait]
pub trait StoragePort: Send + Sync {
async fn write_parquet(&self, path: &str, data: Bytes) -> Result<(), HyperbytedbError>;
async fn read_parquet(&self, path: &str) -> Result<Bytes, HyperbytedbError>;
async fn list_parquet_files(&self, prefix: &str) -> Result<Vec<String>, HyperbytedbError>;
async fn delete_parquet(&self, path: &str) -> Result<(), HyperbytedbError>;
}
LocalStorage creates intermediate directories with create_dir_all before writing. Uses tokio::fs for async I/O.
S3Storage uses the object_store crate to interact with any S3-compatible service (AWS S3, MinIO, Cloudflare R2). Configured via [storage.s3].
11. File Registration¶
File: src/adapters/metadata/rocksdb_meta.rs
After writing a Parquet file, the flush service registers it in the metadata store.
Registration call¶
RocksDB key format¶
Where min_time_hex is the min_time formatted as a zero-padded 16-character hexadecimal string. This enables efficient prefix scanning for time-range queries.
Stored value¶
{
"path": "/var/lib/hyperbytedb/data/mydb/autogen/cpu/2024-01-15/12_abc123.parquet",
"min_time": 1705312800000000000,
"max_time": 1705316399999999999,
"crc32": 2873465912
}
Generation counter¶
Each register_parquet_file and unregister_parquet_file call increments a monotonic generation counter. This is used by the Merkle tree cache to detect when trees need rebuilding.
12. WAL Truncation¶
File: src/application/flush_service.rs
Single-node mode¶
After all Parquet files are written and registered, the WAL is truncated up to the maximum processed sequence number:
This deletes all entries with sequence < chunk_max_seq + 1 using RocksDB's delete_range_cf.
Cluster-aware truncation¶
In cluster mode, compute_safe_truncate_seq() ensures the WAL is not truncated past what peers have acknowledged:
- Query
replication_log.min_wal_ack()-- the minimum WAL sequence acknowledged by any peer. - The safe truncation point is
min(chunk_max_seq, min_wal_ack). - Edge case -- no acks: If no peers have acked but active peers exist, hold the WAL with a safety cap of
MAX_WAL_RETENTION_ENTRIES(500,000) to prevent unbounded growth. - Edge case -- no peers: If no active peers exist, truncate normally.
Mutation log truncation¶
After WAL truncation, the replication mutation log is also truncated based on min_mutation_ack() from all peers.
13. Cluster Write Replication¶
File: src/application/peer_ingestion_service.rs, src/cluster/peer_client.rs
PeerIngestionService¶
In cluster mode, PeerIngestionService wraps the standard ingestion service. After local WAL append succeeds:
- After local WAL append, send an
OutboundReplicationBatch(raw line protocol bytes + db/rp/precision) to peers over the v1 media type. - Call
peer_client.replicate_write(request, wal_seq)-- this spawns an async task. - Return
204to the client immediately (fire-and-forget).
Replication protocol¶
For each active peer (from membership.active_peers()):
POST /internal/replicatewith the line protocol body (same bytes as the client write),Content-Typeset to the InfluxDB v1 line-protocol media type, and database / retention / precision / origin carried inX-Hyperbytedb-*headers (seepeer_client.rs).- Include header
X-Hyperbytedb-Replicated: trueto prevent re-replication loops. - On success (2xx): call
replication_log.set_wal_ack(peer_id, wal_seq). - On failure: retry with exponential backoff (1s base, 30s max) up to
replication_max_retries(default 5).
Receiving replicated writes¶
The receiving node checks for X-Hyperbytedb-Replicated: true. If present, it persists locally (WAL + metadata) but does not re-replicate.
14. Metrics¶
All metrics use the metrics crate with metrics-exporter-prometheus.
Write path metrics¶
| Metric | Type | Description |
|---|---|---|
hyperbytedb_write_requests_total | counter | Total write requests received |
hyperbytedb_write_errors_total | counter | Failed write requests |
hyperbytedb_write_payload_bytes | histogram | Raw payload size in bytes |
hyperbytedb_write_duration_seconds | histogram | Write handler latency |
hyperbytedb_ingestion_points_total | counter | Points ingested |
hyperbytedb_wal_appends_total | counter | WAL append operations |
Flush metrics¶
| Metric | Type | Description |
|---|---|---|
hyperbytedb_flush_runs_total | counter | Flush cycles completed |
hyperbytedb_flush_errors_total | counter | Failed flush cycles |
hyperbytedb_flush_points_total | counter | Points flushed to Parquet |
hyperbytedb_flush_duration_seconds | histogram | Flush cycle duration |
hyperbytedb_parquet_bytes_written_total | counter | Total Parquet bytes written |
hyperbytedb_parquet_files_written_total | counter | Total Parquet files written |
hyperbytedb_wal_last_sequence | gauge | Last flushed WAL sequence |
Replication metrics¶
| Metric | Type | Description |
|---|---|---|
hyperbytedb_replication_writes_total | counter | Replication attempts |
hyperbytedb_replication_errors_total | counter | Failed replications |
hyperbytedb_replication_duration_seconds | histogram | Replication latency |