Deep Dive: Read Path¶
This document traces every step of the HyperbyteDB query path, from HTTP request to InfluxDB-compatible JSON response. It covers InfluxQL parsing, AST construction, ClickHouse SQL translation, chDB execution, result formatting, chunked execution for large datasets, and cluster query fan-out.
Table of Contents¶
- Overview
- HTTP Query Handler
- InfluxQL Parser
- AST Structure
- Query Dispatch
- SELECT Execution Flow
- File Source Resolution
- InfluxQL to ClickHouse Translation
- Tombstone Injection
- chDB Execution
- Result Parsing
- Chunked Execution
- SLIMIT and SOFFSET
- SELECT INTO
- Cluster Query Behavior
- Metrics
1. Overview¶
Client GET/POST /query
|
v
+------------------------------------+
| HTTP Handler (query.rs) |
| - Extract q, db, epoch, params |
| - Substitute bind parameters |
| - Delegate to QueryService |
+------------------------------------+
|
v
+------------------------------------+
| QueryServiceImpl::execute_query() |
| - Parse InfluxQL -> Vec<Statement>|
| - Timeout wrapper |
| - Dispatch each statement |
+------------------------------------+
|
v (SELECT statements)
+------------------------------------+
| execute_measurement_query() |
| - Resolve file source (paths/glob)|
| - Translate InfluxQL -> CH SQL |
| - Inject tombstone predicates |
| - Execute via chDB |
| - Parse JSONEachRow -> Series |
+------------------------------------+
|
v
+------------------------------------+
| chDB (embedded ClickHouse) |
| - file() table over Parquet files |
| - Returns JSONEachRow |
+------------------------------------+
2. HTTP Query Handler¶
File: src/adapters/http/query.rs
Entry points¶
handle_query_get()--GET /query?q=...&db=...handle_query_post()--POST /querywith form-encoded or query-string params
POST merges form body parameters into the query string parameters (body takes precedence).
Query parameters¶
| Parameter | Required | Default | Description |
|---|---|---|---|
q | Yes | -- | InfluxQL query string |
db | Depends | -- | Required for data queries; not for SHOW DATABASES |
epoch | No | RFC3339 | Timestamp format: ns, us/u, ms, s |
pretty | No | false | Pretty-print JSON |
chunked | No | false | Stream results per statement |
params | No | -- | JSON object for $param bind parameter substitution |
Bind parameter substitution¶
When params is provided (JSON object), $paramName placeholders in the query string are replaced with the corresponding values:
q = SELECT mean("value") FROM "cpu" WHERE "host" = $host
params = {"host": "'server01'"}
Result: SELECT mean("value") FROM "cpu" WHERE "host" = 'server01'
Output formats¶
| Accept Header | Format |
|---|---|
application/json (default) | InfluxDB v1 JSON |
text/csv or application/csv | CSV |
Statement summary tracking¶
If statement_summary.enabled = true, each executed statement is recorded with: - Statement type and normalized digest - Execution duration - Error status - Result available via GET /api/v1/statements
3. InfluxQL Parser¶
File: src/influxql/parser.rs
Architecture¶
The parser is a hand-rolled recursive descent parser with no parser generator dependency. It operates on string slices.
Parse flow¶
Input: "SELECT mean(\"value\") FROM \"cpu\"; SHOW DATABASES"
|
v
split_statements(";")
|
+----> "SELECT mean(\"value\") FROM \"cpu\""
| |
| v
| parse_statement()
| first token = "SELECT" -> parse_select()
| |
| v
| SelectStatement { fields, from, condition, ... }
|
+----> "SHOW DATABASES"
|
v
parse_statement()
first token = "SHOW" -> parse_show()
|
v
Statement::ShowDatabases
Statement dispatch¶
The parser examines the first keyword (case-insensitive):
| First Token | Handler | Statement Types |
|---|---|---|
SELECT | parse_select() | SelectStatement |
SHOW | parse_show() | ShowDatabases, ShowMeasurements, ShowTagKeys, ShowTagValues, ShowFieldKeys, ShowSeries, ShowRetentionPolicies, ShowUsers, ShowContinuousQueries |
CREATE | parse_create() | CreateDatabase, CreateRetentionPolicy, CreateUser, CreateContinuousQuery |
DROP | parse_drop() | DropDatabase, DropMeasurement, DropRetentionPolicy, DropUser, DropContinuousQuery |
DELETE | parse_delete() | Delete |
ALTER | parse_alter() | AlterRetentionPolicy |
SET | parse_set_password() | SetPassword |
GRANT | parse_grant() | Grant |
REVOKE | parse_revoke() | Revoke |
Expression parser¶
The SELECT field list and WHERE clause use a precedence-climbing expression parser:
| Precedence | Operators |
|---|---|
| 1 (lowest) | OR |
| 2 | AND |
| 3 | =, !=, <>, <, <=, >, >=, =~, !~ |
| 4 | +, - |
| 5 | *, /, % |
| 6 (highest) | Unary -, NOT |
Atoms¶
Atom types recognized by parse_atom():
- Identifiers: bare (
cpu) or quoted ("cpu") - String literals:
'value' - Integer / float literals
- Duration literals:
1h,30s,5m,1d,4w,100ms,500u,1ns now()function- Function calls:
mean(...),derivative(...),count(*) - Wildcard:
* - Regex:
/pattern/ - Subqueries:
(SELECT ...)
SELECT parsing¶
parse_select() uses split_clauses() to find keyword boundaries (INTO, FROM, WHERE, GROUP BY, ORDER BY, LIMIT, OFFSET, SLIMIT, SOFFSET, TZ), then parses each clause independently.
4. AST Structure¶
File: src/influxql/ast.rs
Key types¶
Statement -- Top-level enum with variants for every supported statement.
SelectStatement -- The primary query type:
| Field | Type | Description |
|---|---|---|
fields | Vec<Field> | SELECT expressions with optional aliases |
into | Option<Measurement> | INTO target measurement |
from | Vec<MeasurementSource> | FROM source(s) |
condition | Option<Expr> | WHERE clause |
group_by | Option<GroupBy> | GROUP BY dimensions |
order_by | Option<OrderBy> | ORDER BY |
limit | Option<u64> | LIMIT |
offset | Option<u64> | OFFSET |
slimit | Option<u64> | SLIMIT (series limit) |
soffset | Option<u64> | SOFFSET (series offset) |
fill | Option<FillOption> | fill() option |
timezone | Option<String> | TZ() |
Expr -- Recursive expression tree: - Identifier(String), Star, StringLiteral(String), IntegerLiteral(i64), FloatLiteral(f64) - DurationLiteral(Duration), RegexLiteral(String), BoolLiteral(bool) - Call(FunctionCall), BinaryExpr(Box<BinaryExpr>), UnaryExpr { op, expr } - Now, FieldRef { name, data_type }, Subquery(Box<SelectStatement>)
MeasurementSource -- Concrete(Measurement) or Subquery(Box<SelectStatement>)
GroupBy -- dimensions: Time(Duration, Option<Duration>), Tag(String), Regex(String)
FillOption -- Null, None, Previous, Linear, Value(f64)
Duration -- value: i64, unit: String with to_nanos() and to_clickhouse_interval()
5. Query Dispatch¶
File: src/application/query_service.rs
execute_query(db, query, epoch)¶
- Wrap the entire execution in
tokio::time::timeout(query_timeout_secs). - Parse the InfluxQL string via
influxql::parse(query)->Vec<Statement>. - For each statement, call
execute_statement()with astatement_id(0, 1, 2...).
execute_statement() routing¶
| Statement | Action |
|---|---|
ShowDatabases | metadata.list_databases() |
ShowMeasurements | metadata.list_measurements(db) |
ShowTagKeys | metadata.list_tag_keys(db, measurement) |
ShowTagValues | metadata.list_tag_values(db, key, measurement) with key filtering (Eq, Neq, Regex, In, All) |
ShowFieldKeys | metadata.get_measurement(db, m) -> field_types |
ShowRetentionPolicies | metadata.list_retention_policies(db) |
ShowSeries | metadata.list_series(db, measurement) |
ShowUsers | metadata.list_users() |
ShowContinuousQueries | metadata.list_continuous_queries() |
CreateDatabase | metadata.create_database(name) |
DropDatabase | metadata.drop_database(name) |
DropMeasurement | metadata.drop_measurement(db, name) |
CreateRetentionPolicy | metadata.create_retention_policy(db, rp) |
AlterRetentionPolicy | metadata.alter_retention_policy(db, name, ...) |
DropRetentionPolicy | metadata.drop_retention_policy(db, name) |
CreateUser | metadata.create_user(username, password_hash, admin) |
DropUser | metadata.drop_user(username) |
SetPassword | metadata.set_password(username, new_hash) |
Delete | metadata.store_tombstone(db, measurement, predicate_sql) |
CreateContinuousQuery | metadata.create_continuous_query(db, cq_def) |
DropContinuousQuery | metadata.drop_continuous_query(db, name) |
Grant / Revoke | Parsed but no-op |
Select | See next section |
6. SELECT Execution Flow¶
Overview¶
- Require non-empty
db. - Resolve the default retention policy via
metadata.get_default_rp(db). - Extract time bounds from the WHERE clause via
to_clickhouse::extract_time_bounds(). - Determine GROUP BY tag dimensions for series grouping.
- Dispatch based on FROM source type.
Concrete measurement (single name)¶
Call execute_measurement_query() with the measurement name.
Regex measurement (FROM /^cpu.*/)¶
- List all measurements via
metadata.list_measurements(db). - Filter by regex using
regex_pattern_matches(). - Execute
execute_measurement_query()in parallel for each matching measurement viafutures::future::join_all. - Combine all series results.
Subquery (FROM (SELECT ...))¶
- Translate the inner
SELECTto ClickHouse SQL. - Wrap the outer
SELECTusingtranslate_with_source()with the inner SQL as the FROM clause. - Execute the combined SQL via chDB.
execute_measurement_query()¶
- Call
resolve_file_source()to determine file paths and count. - Decide between single execution or chunked execution based on file count and time span.
- Translate InfluxQL to ClickHouse SQL via
to_clickhouse::translate(). - Inject tombstone predicates.
- Execute via
query_port.execute_sql(). - Parse JSONEachRow results to
Vec<SeriesResult>.
7. File Source Resolution¶
File: src/application/query_service.rs
resolve_file_source()¶
Determines which Parquet files will be queried and returns a glob pattern or explicit file list for chDB's file() function.
Strategy¶
- Query
metadata.get_parquet_files(db, rp, measurement, min_time, max_time)to get paths overlapping the time range. - Return the count and a file source string.
Chunked execution threshold¶
If file_count > CHUNK_FILE_THRESHOLD (50) and the time span exceeds compute_chunk_nanos(file_count), the query uses chunked execution (see section 12).
8. InfluxQL to ClickHouse Translation¶
File: src/influxql/to_clickhouse.rs
translate(stmt, parquet_glob) -> String¶
Converts a SelectStatement AST into ClickHouse SQL that queries Parquet files via the file() table function.
Translation pipeline¶
SelectStatement
|
v
SELECT clause
- Time bucket: toStartOfInterval()
- Aggregate function mapping
- Transform -> window functions
- fill(N) -> ifNull() wrapping
- Arithmetic expressions
- Aliases
|
v
FROM clause
- file('{glob}', Parquet)
- Subqueries become inline SELECTs
|
v
WHERE clause
- now() -> now64()
- Duration -> INTERVAL
- Epoch literals -> fromUnixTimestamp64*
- Regex =~ -> match()
- Tombstone predicates
|
v
GROUP BY clause
- time(5m) -> toStartOfInterval()
- Tag dimensions
|
v
ORDER BY clause
- WITH FILL for fill modes
- INTERPOLATE for fill(previous) and fill(linear)
|
v
LIMIT / OFFSET
Aggregate function mapping¶
| InfluxQL | ClickHouse SQL |
|---|---|
MEAN(f) | avg(f) |
MEDIAN(f) | median(f) |
COUNT(f) | count(f) |
SUM(f) | sum(f) |
MIN(f) | min(f) |
MAX(f) | max(f) |
FIRST(f) | argMin(f, time) |
LAST(f) | argMax(f, time) |
PERCENTILE(f, N) | quantile(N/100.0)(f) |
SPREAD(f) | (max(f) - min(f)) |
STDDEV(f) | stddevPop(f) |
MODE(f) | topKWeighted(1)(f, 1) |
DISTINCT(f) | DISTINCT f |
Transform function mapping (window functions)¶
| InfluxQL | ClickHouse SQL |
|---|---|
DERIVATIVE(f, unit) | (f - lagInFrame(f, 1) OVER (ORDER BY __time)) / nullIf(dateDiff('unit', lagInFrame(__time, 1) OVER ..., __time), 0) * scale |
NON_NEGATIVE_DERIVATIVE(f, unit) | Same as DERIVATIVE, wrapped in greatest(..., 0) |
DIFFERENCE(f) | f - lagInFrame(f, 1) OVER (ORDER BY __time) |
NON_NEGATIVE_DIFFERENCE(f) | greatest(f - lagInFrame(f, 1) OVER ..., 0) |
MOVING_AVERAGE(f, N) | avg(f) OVER (ORDER BY __time ROWS BETWEEN N-1 PRECEDING AND CURRENT ROW) |
CUMULATIVE_SUM(f) | sum(f) OVER (ORDER BY __time ROWS UNBOUNDED PRECEDING) |
ELAPSED(f, unit) | dateDiff('unit', lagInFrame(__time, 1) OVER ..., __time) |
When transforms wrap aggregates (e.g., derivative(mean("value"), 1s)), the aggregate is first computed in a subquery, then the transform is applied as a window function in the outer query.
Time bucket translation¶
-- InfluxQL: GROUP BY time(5m)
-- ClickHouse:
toStartOfInterval(time, INTERVAL 5 MINUTE) AS __time
-- InfluxQL: GROUP BY time(1h, 15m) (with offset)
-- ClickHouse:
toStartOfInterval(time - INTERVAL 15 MINUTE, INTERVAL 1 HOUR) + INTERVAL 15 MINUTE AS __time
The internal alias __time avoids collision with the raw time column. It is renamed back to time in the result parser.
Fill mode translation¶
| Fill Mode | ClickHouse Translation |
|---|---|
fill(null) | ORDER BY __time WITH FILL FROM ... TO ... STEP INTERVAL ... |
fill(none) | No WITH FILL clause |
fill(0) or fill(N) | ifNull(agg, N) + WITH FILL |
fill(previous) | WITH FILL + INTERPOLATE (col AS col) |
fill(linear) | WITH FILL + INTERPOLATE (col AS col USING LINEAR) |
Regex operator translation¶
| InfluxQL | ClickHouse |
|---|---|
"host" =~ /^us-.*/ | match(host, '^us-.*') |
"host" !~ /^eu-.*/ | NOT match(host, '^eu-.*') |
Time literal translation¶
Epoch timestamps with unit suffixes are translated to ClickHouse temporal functions:
| InfluxQL | ClickHouse |
|---|---|
1609459200000000000 (bare integer) | fromUnixTimestamp64Nano(1609459200000000000) |
1609459200000ms | fromUnixTimestamp64Milli(1609459200000) |
now() | now64() |
now() - 1h | now64() - INTERVAL 1 HOUR |
9. Tombstone Injection¶
File: src/application/query_service.rs
inject_tombstone_predicates()¶
Before executing a SELECT, the query service loads all tombstones for the measurement:
For each tombstone (id, predicate_sql), the generated WHERE clause is extended with:
This ensures deleted data is excluded at query time without modifying the underlying Parquet files.
10. chDB Execution¶
Files: src/adapters/chdb/query_adapter.rs, src/adapters/chdb/pool.rs
Session management¶
chDB sessions are Send but not Sync, so each is wrapped in Arc<Mutex<Session>>. All queries execute in spawn_blocking since chDB is synchronous.
Single session (ChdbQueryAdapter)¶
One Session instance wrapped in Arc<Mutex<Session>>. Every query contends on the same mutex.
Session pool (ChdbPool)¶
Multiple sessions in Vec<Arc<Mutex<Session>>>, selected round-robin via AtomicUsize. Each session has its own data directory ({base}/session_{i}).
Optional concurrency limit via Arc<Semaphore> (from max_concurrent_queries config). When set, queries acquire a semaphore permit before obtaining a session.
Query execution flow¶
- Acquire semaphore permit (if configured).
- Select session via round-robin.
spawn_blocking:session.blocking_lock()session.execute(sql, OutputFormat::JSONEachRow)- Return the UTF-8 result string.
- Handle file-related errors gracefully:
CANNOT_EXTRACT_TABLE_STRUCTURE,no files,CANNOT_STAT-> treated as empty result (no data, not an error).- Other errors ->
HyperbytedbError::Chdb.
Output format¶
All queries use OutputFormat::JSONEachRow -- one JSON object per result row:
{"__time":"2024-01-15 10:00:00","host":"server01","mean_usage_idle":42.5}
{"__time":"2024-01-15 10:05:00","host":"server01","mean_usage_idle":38.2}
11. Result Parsing¶
File: src/application/query_service.rs
parse_json_each_row_to_series(raw, measurement, epoch, group_by_tags)¶
Transforms chDB's JSONEachRow output into InfluxDB v1 series format.
Steps¶
- Split the raw string by newlines.
- Parse each line as a JSON object.
- Rename
__timetotime. - Convert ClickHouse datetime strings (
"2024-01-15 10:00:00") to timestamps. - Apply
epochparameter formatting: Noneor""-> RFC3339 string ("2024-01-15T10:00:00Z")"ns"-> nanosecond integer"us"/"u"-> microsecond integer"ms"-> millisecond integer"s"-> second integer- Group rows by tag combination:
- Extract tag values from GROUP BY tag columns.
- Rows with the same tag combination form one
SeriesResult. - Each
SeriesResultcontains: name: measurement nametags:HashMap<String, String>(tag key-value pairs)columns: list of column namesvalues: 2D array of values
normalize_time_value(value, epoch)¶
Handles the conversion of various time representations: - ClickHouse datetime strings -> parsed to nanoseconds -> formatted per epoch - Integer timestamps -> scaled and formatted per epoch - Null times -> preserved as null
12. Chunked Execution¶
File: src/application/query_service.rs
When chunked execution is used¶
For large datasets with many Parquet files, scanning everything in a single chDB query can be slow. Chunked execution splits the time range into smaller windows and executes them in parallel.
Trigger conditions¶
file_count > CHUNK_FILE_THRESHOLD(50)- Time span exceeds
compute_chunk_nanos(file_count)-- dynamically computed based on file density
Algorithm¶
- Split the
[min_time, max_time]range into chunks ofchunk_nanoseach. - For each chunk, build a separate ClickHouse SQL query scoped to that time sub-range.
- Execute all chunks in parallel via
tokio::spawn. - Concatenate JSON results from all chunks.
- Parse the concatenated JSON into series.
- Sort series values by time.
This approach exploits I/O parallelism since each chunk reads a different subset of Parquet files.
13. SLIMIT and SOFFSET¶
After all series are collected, SLIMIT and SOFFSET are applied for series-level pagination:
SOFFSET N-- skip the first N seriesSLIMIT M-- return at most M series
This is applied after query execution since the number of series is only known after result parsing (each unique tag combination produces one series).
14. SELECT INTO¶
write_series_as_points()¶
When a SELECT statement includes an INTO clause (SELECT ... INTO target_meas ...), query results are written back to the database as new points:
- Execute the SELECT query normally.
- For each result series:
- Convert each row back to a
Pointwith the target measurement name. - Tags from GROUP BY are preserved.
- Field values come from the selected columns.
- Append all points to the WAL via
wal.append(WalEntry { ... }). - Return a result series showing the number of points written.
15. Cluster Query Behavior¶
File: src/application/peer_query_service.rs
PeerQueryService¶
In cluster mode, PeerQueryService wraps the local QueryServiceImpl. It adds Raft-based replication for schema-mutating statements.
Non-mutating queries (SELECT, SHOW)¶
Passed directly to the local QueryServiceImpl. Each node queries only its own Parquet files -- there is no distributed query fan-out or scatter-gather for data queries.
Mutating statements (CREATE, DROP, DELETE, etc.)¶
- Execute locally via
inner.execute_query(). - If successful, replicate via
replicate_mutation(): - If Raft is configured: route through
raft.client_write(ClusterRequest::SchemaMutation(...))for consensus. - Otherwise: broadcast via
peer_client.replicate_mutation(). - Replication errors are captured in the
StatementResult.errorfield but do not prevent the local operation.
is_cluster_mutation(stmt) detection¶
Returns true for: CreateDatabase, DropDatabase, DropMeasurement, CreateRetentionPolicy, AlterRetentionPolicy, DropRetentionPolicy, CreateUser, DropUser, SetPassword, Delete, CreateContinuousQuery, DropContinuousQuery.
16. Metrics¶
Query metrics¶
| Metric | Type | Description |
|---|---|---|
hyperbytedb_query_requests_total | counter | Total query requests |
hyperbytedb_query_errors_total | counter | Failed queries |
hyperbytedb_query_duration_seconds | histogram | Query execution latency |
hyperbytedb_write_requests_total | counter | Write requests (for SELECT INTO) |