Skip to content

Deep Dive: Write Path

This document traces every step of the HyperbyteDB write path, from the moment an HTTP request arrives to the point where data is persisted as a Parquet file on disk (or S3). It covers line protocol ingestion, WAL durability, the background flush pipeline, Arrow schema construction, Parquet encoding, and cluster replication.


Table of Contents

  1. Overview
  2. HTTP Write Handler
  3. Line Protocol Parsing
  4. Metadata Registration
  5. WAL Append
  6. Flush Pipeline
  7. Arrow Schema Construction
  8. RecordBatch Building
  9. Parquet File Writing
  10. Storage Path Layout
  11. File Registration
  12. WAL Truncation
  13. Cluster Write Replication
  14. Metrics

1. Overview

The write path is split into two phases:

Phase 1 -- Synchronous (client-blocking): HTTP request handling, line protocol parsing, metadata registration, and WAL append. The client receives a 204 No Content response as soon as the WAL append completes. Data is durable at this point.

Phase 2 -- Asynchronous (background): The flush service periodically reads WAL entries, converts them to Arrow RecordBatches, writes Parquet files, registers them in metadata, and truncates the WAL.

Client POST /write
       |
       v
+-------------------------------+
| HTTP Handler (write.rs)       |
|  - Validate params            |
|  - Decompress gzip            |
|  - Cluster state check        |
+-------------------------------+
       |
       v
+-------------------------------+
| Ingestion Service             |
|  - Parse line protocol        |
|  - Register metadata          |
|  - Append to WAL              |
+-------------------------------+
       |
       v  (204 returned to client)
       |
       v  (background, every N seconds)
+-------------------------------+
| Flush Service                 |
|  - Read WAL entries           |
|  - Group by (db, rp, meas)   |
|  - Partition by hour          |
|  - Convert to Arrow           |
|  - Write Parquet files        |
|  - Register in metadata       |
|  - Truncate WAL               |
+-------------------------------+
       |
       v
+-------------------------------+
| Storage Backend               |
|  LocalStorage or S3Storage    |
+-------------------------------+

2. HTTP Write Handler

File: src/adapters/http/write.rs

Entry point

handle_write() handles POST /write requests.

Query parameters

Parameter Required Default Description
db Yes -- Target database name
rp No Default RP Retention policy name
precision No ns Timestamp precision: ns, us/u, ms, s
u No -- Username (when auth enabled)
p No -- Password (when auth enabled)

Processing steps

  1. Cluster state check -- In cluster mode, reject writes when the node's state is Draining, Leaving, Syncing, or Joining. Returns 503 Service Unavailable with an optional X-Hyperbytedb-Redirect header pointing to an active peer.
  2. Validate db -- If missing, returns 400 with "database is required".
  3. Gzip decompression -- If Content-Encoding: gzip, the body is decompressed via flate2::read::GzDecoder. This supports compressed payloads from Telegraf and other agents.
  4. Delegate to IngestionPort.ingest() -- Passes db, rp, precision, and the raw body bytes.

Metrics emitted

  • hyperbytedb_write_payload_bytes -- histogram of raw payload size
  • hyperbytedb_write_duration_seconds -- histogram of handler latency
  • hyperbytedb_write_requests_total -- counter
  • hyperbytedb_write_errors_total -- counter (on failure)

3. Line Protocol Parsing

File: src/application/ingestion_service.rs

Overview

The IngestionServiceImpl::ingest() method parses the body as InfluxDB line protocol and converts each line into a domain Point.

Line protocol format

measurement,tag1=val1,tag2=val2 field1=1.0,field2="str" 1609459200000000000

Parsing steps

  1. UTF-8 validation -- The body bytes are converted to &str.
  2. influxdb_line_protocol::parse_lines(input) -- Uses the influxdb-line-protocol crate to produce ParsedLine structs.
  3. parsed_line_to_point() -- Converts each ParsedLine to a domain Point:
  4. Measurement from series.measurement.
  5. Tags from the tag set (sorted BTreeMap<String, String>).
  6. Fields via lp_field_value_to_domain():
    • LpFieldValue::F64 -> FieldValue::Float(f64)
    • LpFieldValue::I64 -> FieldValue::Integer(i64)
    • LpFieldValue::U64 -> FieldValue::UInteger(u64)
    • LpFieldValue::String -> FieldValue::String(String)
    • LpFieldValue::Boolean -> FieldValue::Boolean(bool)
  7. Timestamp -- If present, scaled to nanoseconds based on precision. If absent, defaults to chrono::Utc::now().timestamp_nanos().

Precision scaling

Precision Multiplier
s x 1,000,000,000
ms x 1,000,000
us / u x 1,000
ns (default) x 1

Error handling

A parse error on any line produces HyperbytedbError::LineProtocolParse { line, reason }, which maps to HTTP 400.


4. Metadata Registration

File: src/application/ingestion_service.rs

Before appending to the WAL, the ingestion service registers schema information in the metadata store.

Steps

  1. Resolve retention policy -- If rp is empty, resolve to the database's default retention policy (typically autogen).
  2. Verify database exists -- metadata.get_database(db) -- returns HyperbytedbError::DatabaseNotFound if missing.
  3. Group points by measurement -- Collect field_types (name -> discriminant) and tag_keys per measurement.
  4. Check field type compatibility -- metadata.check_field_types(db, measurement, &field_types) -- if a field was previously registered as Float64 and a write sends Integer, returns HyperbytedbError::FieldTypeConflict.
  5. Cardinality limits -- Check max_measurements_per_database and max_tag_values_per_measurement. Returns HyperbytedbError::CardinalityExceeded if either limit is hit.
  6. Register measurement -- metadata.register_measurement(db, measurement, field_types, tag_keys) -- upserts MeasurementMeta in the metadata store.
  7. Store tag values -- metadata.store_tag_value(db, measurement, tag_key, tag_value) for each unique tag value. These are used by SHOW TAG VALUES queries.

MeasurementMeta

struct MeasurementMeta {
    name: String,
    field_types: HashMap<String, u8>,  // field_name -> FieldValue discriminant
    tag_keys: Vec<String>,
}

Stored in RocksDB under key meas:{db}:{name} as JSON.


5. WAL Append

Files: src/ports/wal.rs, src/adapters/wal/rocksdb_wal.rs

WalEntry structure

struct WalEntry {
    database: String,
    retention_policy: String,
    points: Vec<Point>,
}

All points from a single write request are grouped into one WalEntry.

RocksDB WAL implementation

The WAL uses RocksDB with two column families:

Column Family Purpose
wal Ordered entries. Keys are big-endian u64 sequence numbers. Values are bincode-serialized WalEntry.
wal_meta Single key last_seq storing the current sequence as big-endian u64.

Append operation

  1. fetch_add on an AtomicU64 to get the next sequence number.
  2. bincode::serialize(&entry) the WalEntry.
  3. Create a WriteBatch with:
  4. put_cf("wal", big_endian(seq), serialized_entry)
  5. put_cf("wal_meta", "last_seq", big_endian(seq))
  6. db.write(batch) -- atomic, durable write.
  7. Return the sequence number.

Key encoding

Sequence numbers are stored as big-endian u64 (8 bytes). This preserves numerical ordering in RocksDB's lexicographic key space, enabling efficient forward iteration and range deletion.

Durability

The write is durable the moment db.write(batch) returns. RocksDB's WAL (not to be confused with HyperbyteDB's WAL concept) ensures fsync semantics.


6. Flush Pipeline

File: src/application/flush_service.rs

The flush service is the bridge between the WAL and Parquet storage. It runs as a background Tokio task.

Constants

Constant Value Purpose
ESTIMATED_BYTES_PER_POINT 512 Memory budget calculation
WAL_READ_CHUNK 5,000 Max WAL entries per read batch
MIN_BATCH_POINTS 10,000 Minimum points per Parquet batch
MAX_BATCH_POINTS 500,000 Maximum points per Parquet batch
MAX_WAL_RETENTION_ENTRIES 500,000 WAL cap when peers are lagging

Lifecycle

  1. Timer tick every flush.interval_secs (default 10s).
  2. On tick, call flush().
  3. On shutdown signal, exit the loop. The drain service calls drain() for a final full flush.

flush() step by step

Step 1 -- Read WAL entries:

entries = wal.read_range(cursor + 1, WAL_READ_CHUNK)

Reads up to 5,000 entries from the WAL starting after the last flushed sequence. If empty, the flush is a no-op. If there are more than 5,000 entries, the outer loop reads successive chunks.

Step 2 -- Group by (database, retention_policy, measurement):

Points are extracted from all entries and grouped into a BTreeMap<(db, rp, measurement), Vec<Point>>.

Step 3 -- Sort, sub-batch, and partition by hour:

For each measurement group: 1. Sort all points by timestamp. 2. Split into sub-batches of max_points_per_batch. 3. Within each sub-batch, call partition_by_hour() to bucket points by hour:

hour_start = (timestamp / NANOS_PER_HOUR) * NANOS_PER_HOUR
This produces Vec<(hour_ts, Vec<Point>)>.

Each (db, rp, measurement, hour_ts, points) tuple becomes a FlushWork item.

Step 4 -- Phase 1: Parquet conversion (CPU-bound):

Each FlushWork is processed in tokio::task::spawn_blocking: 1. parquet_writer::points_to_parquet(&points) -- converts to Parquet bytes. 2. make_unique_path() -- appends UUID suffix to avoid file collisions. 3. Compute min_time and max_time from point timestamps. 4. Compute CRC32 checksum of the Parquet bytes.

All conversion tasks run in parallel on the blocking threadpool.

Step 5 -- Phase 2: Write and register (I/O-bound):

Each FlushResult is processed in tokio::spawn: 1. storage.write_parquet(rel_path, data) -- writes to local FS or S3. 2. metadata.register_parquet_file(db, rp, measurement, full_path, min_time, max_time, Some(crc32)) -- registers in the Parquet file registry.

All write tasks run in parallel.

Step 6 -- Truncate WAL:

After all writes complete: 1. Compute safe truncation point via compute_safe_truncate_seq(). 2. wal.truncate_before(safe_seq + 1) -- deletes processed entries. 3. Update last_flushed cursor.

Memory-aware batch sizing

When max_points_per_batch = 0 (default), auto_detect_batch_size() is called:

  1. On Linux: reads /proc/meminfo for MemAvailable, uses 25% of available memory.
  2. Elsewhere: defaults to 1 GB.
  3. Budget in points: budget / (ESTIMATED_BYTES_PER_POINT * 3) -- the factor of 3 accounts for the point itself, the Arrow arrays, and the Parquet output buffer.
  4. Clamped to [10,000 .. 500,000].

Parallelism model

WAL entries
    |
    v
[Sequential] Group, sort, partition
    |
    v
[Parallel spawn_blocking] Parquet conversion (CPU)
    |  |  |  ...
    v  v  v
[Wait all] Collect results
    |
    v
[Parallel tokio::spawn] Write + register (I/O)
    |  |  |  ...
    v  v  v
[Wait all] Confirm writes
    |
    v
[Sequential] Truncate WAL

7. Arrow Schema Construction

File: src/adapters/storage/parquet_writer.rs

build_schema(tag_keys, field_types) -> Schema

Constructs an Arrow Schema with a deterministic column order:

Position Column Arrow Type Nullable
0 time Timestamp(Nanosecond, Some("UTC")) No
1..N Tag columns (sorted by key) Utf8 Yes
N+1..M Field columns (sorted by key) Varies by discriminant Yes

Field type mapping

FieldValue Discriminant Arrow DataType
0 (Float) Float64
1 (Integer) Int64
2 (UInteger) UInt64
3 (String) Utf8
4 (Boolean) Boolean
other (fallback) Float64

Schema discovery in points_to_parquet()

When converting a batch of points (used during flush), points_to_parquet() discovers the schema dynamically:

  1. Walk all points and collect tag keys into a BTreeSet<String> (sorted, deduplicated).
  2. Walk all points and collect field types into a BTreeMap<String, u8>.
  3. Type conflict resolution: If the same field name appears with both Integer (1) and Float64 (0) discriminants across different points, the wider type Float64 (0) wins.

8. RecordBatch Building

File: src/adapters/storage/parquet_writer.rs

points_to_record_batch(points, schema, tag_keys, field_types) -> RecordBatch

Converts a slice of Point structs into an Arrow RecordBatch.

Algorithm

  1. Pre-allocate builders with Vec::with_capacity(n) for each column.
  2. Iterate points: For each point:
  3. Append timestamp to the TimestampNanosecondArray builder.
  4. For each tag key: if the point has that tag, push Some(value), else None.
  5. For each field key: if the point has that field, push Some(FieldValue), else None.
  6. Build typed arrays:
  7. Timestamps: TimestampNanosecondArray with UTC timezone.
  8. Tags: StringBuilder -> StringArray (nullable).
  9. Fields: Type-specific arrays:
    • Float64Array for discriminant 0
    • Int64Array for discriminant 1
    • UInt64Array for discriminant 2
    • StringArray for discriminant 3
    • BooleanArray for discriminant 4
  10. Assemble columns: [timestamps, ...tag_arrays, ...field_arrays].
  11. Create RecordBatch: RecordBatch::try_new(schema, columns).

Null handling

Missing tags and fields are represented as nulls in the Arrow arrays. This is the correct behavior for time-series data where not every point has every tag or field.

Error handling

  • Empty points slice returns HyperbytedbError::Storage("cannot create record batch from empty points").
  • Schema mismatch in RecordBatch::try_new returns HyperbytedbError::Storage.

9. Parquet File Writing

File: src/adapters/storage/parquet_writer.rs

write_parquet(batch) -> Vec<u8>

Serializes a RecordBatch to Parquet bytes in memory.

Writer properties

Property Value Rationale
Compression ZSTD level 1 Good compression ratio with fast decompression
Max row group size 65,536 rows Balances memory usage vs query granularity
Statistics Page-level (EnabledStatistics::Page) Enables predicate pushdown in chDB

Process

  1. Create WriterProperties with the above settings.
  2. Create ArrowWriter writing to a Vec<u8> buffer.
  3. writer.write(batch) -- encodes the RecordBatch.
  4. writer.close() -- flushes and finalizes the Parquet footer.
  5. Return the buffer.

End-to-end: points_to_parquet()

Combines schema discovery, RecordBatch building, and Parquet writing into a single call:

points -> discover schema -> build RecordBatch -> write Parquet -> Bytes

Returns Bytes::new() (empty) for an empty input slice, and Bytes::from(buf) otherwise.


10. Storage Path Layout

File: src/domain/storage_layout.rs

Path format

{db}/{rp}/{measurement}/{YYYY-MM-DD}/{HH}.parquet

Example: mydb/autogen/cpu/2024-01-15/12.parquet

Unique path generation

make_unique_path() appends an 8-character UUID suffix before .parquet to prevent overwrites when multiple flushes write to the same hour bucket:

Input:  mydb/autogen/cpu/2024-01-15/12.parquet
Output: mydb/autogen/cpu/2024-01-15/12_a3f8b1c2.parquet

StorageLayout methods

Method Returns Purpose
parquet_path(db, rp, meas, ts_nanos) Absolute path Full path with base_path prefix
parquet_path_relative(db, rp, meas, ts_nanos) Relative path Used by storage adapters
glob_pattern(db, rp, meas, min_time, max_time) Glob string For chDB file() queries
parquet_glob(db, rp, meas) Glob string All Parquet files for a measurement

Storage backends

Both LocalStorage and S3Storage implement the StoragePort trait:

#[async_trait]
pub trait StoragePort: Send + Sync {
    async fn write_parquet(&self, path: &str, data: Bytes) -> Result<(), HyperbytedbError>;
    async fn read_parquet(&self, path: &str) -> Result<Bytes, HyperbytedbError>;
    async fn list_parquet_files(&self, prefix: &str) -> Result<Vec<String>, HyperbytedbError>;
    async fn delete_parquet(&self, path: &str) -> Result<(), HyperbytedbError>;
}

LocalStorage creates intermediate directories with create_dir_all before writing. Uses tokio::fs for async I/O.

S3Storage uses the object_store crate to interact with any S3-compatible service (AWS S3, MinIO, Cloudflare R2). Configured via [storage.s3].


11. File Registration

File: src/adapters/metadata/rocksdb_meta.rs

After writing a Parquet file, the flush service registers it in the metadata store.

Registration call

metadata.register_parquet_file(db, rp, measurement, path, min_time, max_time, Some(crc32))

RocksDB key format

pq:{db}:{rp}:{measurement}:{min_time_hex}

Where min_time_hex is the min_time formatted as a zero-padded 16-character hexadecimal string. This enables efficient prefix scanning for time-range queries.

Stored value

{
  "path": "/var/lib/hyperbytedb/data/mydb/autogen/cpu/2024-01-15/12_abc123.parquet",
  "min_time": 1705312800000000000,
  "max_time": 1705316399999999999,
  "crc32": 2873465912
}

Generation counter

Each register_parquet_file and unregister_parquet_file call increments a monotonic generation counter. This is used by the Merkle tree cache to detect when trees need rebuilding.


12. WAL Truncation

File: src/application/flush_service.rs

Single-node mode

After all Parquet files are written and registered, the WAL is truncated up to the maximum processed sequence number:

wal.truncate_before(chunk_max_seq + 1)

This deletes all entries with sequence < chunk_max_seq + 1 using RocksDB's delete_range_cf.

Cluster-aware truncation

In cluster mode, compute_safe_truncate_seq() ensures the WAL is not truncated past what peers have acknowledged:

  1. Query replication_log.min_wal_ack() -- the minimum WAL sequence acknowledged by any peer.
  2. The safe truncation point is min(chunk_max_seq, min_wal_ack).
  3. Edge case -- no acks: If no peers have acked but active peers exist, hold the WAL with a safety cap of MAX_WAL_RETENTION_ENTRIES (500,000) to prevent unbounded growth.
  4. Edge case -- no peers: If no active peers exist, truncate normally.

Mutation log truncation

After WAL truncation, the replication mutation log is also truncated based on min_mutation_ack() from all peers.


13. Cluster Write Replication

File: src/application/peer_ingestion_service.rs, src/cluster/peer_client.rs

PeerIngestionService

In cluster mode, PeerIngestionService wraps the standard ingestion service. After local WAL append succeeds:

  1. After local WAL append, send an OutboundReplicationBatch (raw line protocol bytes + db/rp/precision) to peers over the v1 media type.
  2. Call peer_client.replicate_write(request, wal_seq) -- this spawns an async task.
  3. Return 204 to the client immediately (fire-and-forget).

Replication protocol

For each active peer (from membership.active_peers()):

  1. POST /internal/replicate with the line protocol body (same bytes as the client write), Content-Type set to the InfluxDB v1 line-protocol media type, and database / retention / precision / origin carried in X-Hyperbytedb-* headers (see peer_client.rs).
  2. Include header X-Hyperbytedb-Replicated: true to prevent re-replication loops.
  3. On success (2xx): call replication_log.set_wal_ack(peer_id, wal_seq).
  4. On failure: retry with exponential backoff (1s base, 30s max) up to replication_max_retries (default 5).

Receiving replicated writes

The receiving node checks for X-Hyperbytedb-Replicated: true. If present, it persists locally (WAL + metadata) but does not re-replicate.


14. Metrics

All metrics use the metrics crate with metrics-exporter-prometheus.

Write path metrics

Metric Type Description
hyperbytedb_write_requests_total counter Total write requests received
hyperbytedb_write_errors_total counter Failed write requests
hyperbytedb_write_payload_bytes histogram Raw payload size in bytes
hyperbytedb_write_duration_seconds histogram Write handler latency
hyperbytedb_ingestion_points_total counter Points ingested
hyperbytedb_wal_appends_total counter WAL append operations

Flush metrics

Metric Type Description
hyperbytedb_flush_runs_total counter Flush cycles completed
hyperbytedb_flush_errors_total counter Failed flush cycles
hyperbytedb_flush_points_total counter Points flushed to Parquet
hyperbytedb_flush_duration_seconds histogram Flush cycle duration
hyperbytedb_parquet_bytes_written_total counter Total Parquet bytes written
hyperbytedb_parquet_files_written_total counter Total Parquet files written
hyperbytedb_wal_last_sequence gauge Last flushed WAL sequence

Replication metrics

Metric Type Description
hyperbytedb_replication_writes_total counter Replication attempts
hyperbytedb_replication_errors_total counter Failed replications
hyperbytedb_replication_duration_seconds histogram Replication latency