Skip to content

Deep Dive: Read Path

This document traces every step of the HyperbyteDB query path, from HTTP request to InfluxDB-compatible JSON response. It covers InfluxQL parsing, AST construction, ClickHouse SQL translation, chDB execution, result formatting, chunked execution for large datasets, and cluster query fan-out.


Table of Contents

  1. Overview
  2. HTTP Query Handler
  3. InfluxQL Parser
  4. AST Structure
  5. Query Dispatch
  6. SELECT Execution Flow
  7. File Source Resolution
  8. InfluxQL to ClickHouse Translation
  9. Tombstone Injection
  10. chDB Execution
  11. Result Parsing
  12. Chunked Execution
  13. SLIMIT and SOFFSET
  14. SELECT INTO
  15. Cluster Query Behavior
  16. Metrics

1. Overview

Client GET/POST /query
       |
       v
+------------------------------------+
| HTTP Handler (query.rs)            |
|  - Extract q, db, epoch, params    |
|  - Substitute bind parameters      |
|  - Delegate to QueryService        |
+------------------------------------+
       |
       v
+------------------------------------+
| QueryServiceImpl::execute_query()  |
|  - Parse InfluxQL -> Vec<Statement>|
|  - Timeout wrapper                 |
|  - Dispatch each statement         |
+------------------------------------+
       |
       v (SELECT statements)
+------------------------------------+
| execute_measurement_query()        |
|  - Resolve file source (paths/glob)|
|  - Translate InfluxQL -> CH SQL    |
|  - Inject tombstone predicates     |
|  - Execute via chDB                |
|  - Parse JSONEachRow -> Series     |
+------------------------------------+
       |
       v
+------------------------------------+
| chDB (embedded ClickHouse)         |
|  - file() table over Parquet files |
|  - Returns JSONEachRow             |
+------------------------------------+

2. HTTP Query Handler

File: src/adapters/http/query.rs

Entry points

  • handle_query_get() -- GET /query?q=...&db=...
  • handle_query_post() -- POST /query with form-encoded or query-string params

POST merges form body parameters into the query string parameters (body takes precedence).

Query parameters

Parameter Required Default Description
q Yes -- InfluxQL query string
db Depends -- Required for data queries; not for SHOW DATABASES
epoch No RFC3339 Timestamp format: ns, us/u, ms, s
pretty No false Pretty-print JSON
chunked No false Stream results per statement
params No -- JSON object for $param bind parameter substitution

Bind parameter substitution

When params is provided (JSON object), $paramName placeholders in the query string are replaced with the corresponding values:

q = SELECT mean("value") FROM "cpu" WHERE "host" = $host
params = {"host": "'server01'"}
Result: SELECT mean("value") FROM "cpu" WHERE "host" = 'server01'

Output formats

Accept Header Format
application/json (default) InfluxDB v1 JSON
text/csv or application/csv CSV

Statement summary tracking

If statement_summary.enabled = true, each executed statement is recorded with: - Statement type and normalized digest - Execution duration - Error status - Result available via GET /api/v1/statements


3. InfluxQL Parser

File: src/influxql/parser.rs

Architecture

The parser is a hand-rolled recursive descent parser with no parser generator dependency. It operates on string slices.

Parse flow

Input: "SELECT mean(\"value\") FROM \"cpu\"; SHOW DATABASES"
  |
  v
split_statements(";")
  |
  +----> "SELECT mean(\"value\") FROM \"cpu\""
  |        |
  |        v
  |      parse_statement()
  |        first token = "SELECT" -> parse_select()
  |        |
  |        v
  |      SelectStatement { fields, from, condition, ... }
  |
  +----> "SHOW DATABASES"
           |
           v
         parse_statement()
           first token = "SHOW" -> parse_show()
           |
           v
         Statement::ShowDatabases

Statement dispatch

The parser examines the first keyword (case-insensitive):

First Token Handler Statement Types
SELECT parse_select() SelectStatement
SHOW parse_show() ShowDatabases, ShowMeasurements, ShowTagKeys, ShowTagValues, ShowFieldKeys, ShowSeries, ShowRetentionPolicies, ShowUsers, ShowContinuousQueries
CREATE parse_create() CreateDatabase, CreateRetentionPolicy, CreateUser, CreateContinuousQuery
DROP parse_drop() DropDatabase, DropMeasurement, DropRetentionPolicy, DropUser, DropContinuousQuery
DELETE parse_delete() Delete
ALTER parse_alter() AlterRetentionPolicy
SET parse_set_password() SetPassword
GRANT parse_grant() Grant
REVOKE parse_revoke() Revoke

Expression parser

The SELECT field list and WHERE clause use a precedence-climbing expression parser:

Precedence Operators
1 (lowest) OR
2 AND
3 =, !=, <>, <, <=, >, >=, =~, !~
4 +, -
5 *, /, %
6 (highest) Unary -, NOT

Atoms

Atom types recognized by parse_atom():

  • Identifiers: bare (cpu) or quoted ("cpu")
  • String literals: 'value'
  • Integer / float literals
  • Duration literals: 1h, 30s, 5m, 1d, 4w, 100ms, 500u, 1ns
  • now() function
  • Function calls: mean(...), derivative(...), count(*)
  • Wildcard: *
  • Regex: /pattern/
  • Subqueries: (SELECT ...)

SELECT parsing

parse_select() uses split_clauses() to find keyword boundaries (INTO, FROM, WHERE, GROUP BY, ORDER BY, LIMIT, OFFSET, SLIMIT, SOFFSET, TZ), then parses each clause independently.


4. AST Structure

File: src/influxql/ast.rs

Key types

Statement -- Top-level enum with variants for every supported statement.

SelectStatement -- The primary query type:

Field Type Description
fields Vec<Field> SELECT expressions with optional aliases
into Option<Measurement> INTO target measurement
from Vec<MeasurementSource> FROM source(s)
condition Option<Expr> WHERE clause
group_by Option<GroupBy> GROUP BY dimensions
order_by Option<OrderBy> ORDER BY
limit Option<u64> LIMIT
offset Option<u64> OFFSET
slimit Option<u64> SLIMIT (series limit)
soffset Option<u64> SOFFSET (series offset)
fill Option<FillOption> fill() option
timezone Option<String> TZ()

Expr -- Recursive expression tree: - Identifier(String), Star, StringLiteral(String), IntegerLiteral(i64), FloatLiteral(f64) - DurationLiteral(Duration), RegexLiteral(String), BoolLiteral(bool) - Call(FunctionCall), BinaryExpr(Box<BinaryExpr>), UnaryExpr { op, expr } - Now, FieldRef { name, data_type }, Subquery(Box<SelectStatement>)

MeasurementSource -- Concrete(Measurement) or Subquery(Box<SelectStatement>)

GroupBy -- dimensions: Time(Duration, Option<Duration>), Tag(String), Regex(String)

FillOption -- Null, None, Previous, Linear, Value(f64)

Duration -- value: i64, unit: String with to_nanos() and to_clickhouse_interval()


5. Query Dispatch

File: src/application/query_service.rs

execute_query(db, query, epoch)

  1. Wrap the entire execution in tokio::time::timeout(query_timeout_secs).
  2. Parse the InfluxQL string via influxql::parse(query) -> Vec<Statement>.
  3. For each statement, call execute_statement() with a statement_id (0, 1, 2...).

execute_statement() routing

Statement Action
ShowDatabases metadata.list_databases()
ShowMeasurements metadata.list_measurements(db)
ShowTagKeys metadata.list_tag_keys(db, measurement)
ShowTagValues metadata.list_tag_values(db, key, measurement) with key filtering (Eq, Neq, Regex, In, All)
ShowFieldKeys metadata.get_measurement(db, m) -> field_types
ShowRetentionPolicies metadata.list_retention_policies(db)
ShowSeries metadata.list_series(db, measurement)
ShowUsers metadata.list_users()
ShowContinuousQueries metadata.list_continuous_queries()
CreateDatabase metadata.create_database(name)
DropDatabase metadata.drop_database(name)
DropMeasurement metadata.drop_measurement(db, name)
CreateRetentionPolicy metadata.create_retention_policy(db, rp)
AlterRetentionPolicy metadata.alter_retention_policy(db, name, ...)
DropRetentionPolicy metadata.drop_retention_policy(db, name)
CreateUser metadata.create_user(username, password_hash, admin)
DropUser metadata.drop_user(username)
SetPassword metadata.set_password(username, new_hash)
Delete metadata.store_tombstone(db, measurement, predicate_sql)
CreateContinuousQuery metadata.create_continuous_query(db, cq_def)
DropContinuousQuery metadata.drop_continuous_query(db, name)
Grant / Revoke Parsed but no-op
Select See next section

6. SELECT Execution Flow

Overview

  1. Require non-empty db.
  2. Resolve the default retention policy via metadata.get_default_rp(db).
  3. Extract time bounds from the WHERE clause via to_clickhouse::extract_time_bounds().
  4. Determine GROUP BY tag dimensions for series grouping.
  5. Dispatch based on FROM source type.

Concrete measurement (single name)

Call execute_measurement_query() with the measurement name.

Regex measurement (FROM /^cpu.*/)

  1. List all measurements via metadata.list_measurements(db).
  2. Filter by regex using regex_pattern_matches().
  3. Execute execute_measurement_query() in parallel for each matching measurement via futures::future::join_all.
  4. Combine all series results.

Subquery (FROM (SELECT ...))

  1. Translate the inner SELECT to ClickHouse SQL.
  2. Wrap the outer SELECT using translate_with_source() with the inner SQL as the FROM clause.
  3. Execute the combined SQL via chDB.

execute_measurement_query()

  1. Call resolve_file_source() to determine file paths and count.
  2. Decide between single execution or chunked execution based on file count and time span.
  3. Translate InfluxQL to ClickHouse SQL via to_clickhouse::translate().
  4. Inject tombstone predicates.
  5. Execute via query_port.execute_sql().
  6. Parse JSONEachRow results to Vec<SeriesResult>.

7. File Source Resolution

File: src/application/query_service.rs

resolve_file_source()

Determines which Parquet files will be queried and returns a glob pattern or explicit file list for chDB's file() function.

Strategy

  1. Query metadata.get_parquet_files(db, rp, measurement, min_time, max_time) to get paths overlapping the time range.
  2. Return the count and a file source string.

Chunked execution threshold

If file_count > CHUNK_FILE_THRESHOLD (50) and the time span exceeds compute_chunk_nanos(file_count), the query uses chunked execution (see section 12).


8. InfluxQL to ClickHouse Translation

File: src/influxql/to_clickhouse.rs

translate(stmt, parquet_glob) -> String

Converts a SelectStatement AST into ClickHouse SQL that queries Parquet files via the file() table function.

Translation pipeline

SelectStatement
     |
     v
SELECT clause
  - Time bucket: toStartOfInterval()
  - Aggregate function mapping
  - Transform -> window functions
  - fill(N) -> ifNull() wrapping
  - Arithmetic expressions
  - Aliases
     |
     v
FROM clause
  - file('{glob}', Parquet)
  - Subqueries become inline SELECTs
     |
     v
WHERE clause
  - now() -> now64()
  - Duration -> INTERVAL
  - Epoch literals -> fromUnixTimestamp64*
  - Regex =~ -> match()
  - Tombstone predicates
     |
     v
GROUP BY clause
  - time(5m) -> toStartOfInterval()
  - Tag dimensions
     |
     v
ORDER BY clause
  - WITH FILL for fill modes
  - INTERPOLATE for fill(previous) and fill(linear)
     |
     v
LIMIT / OFFSET

Aggregate function mapping

InfluxQL ClickHouse SQL
MEAN(f) avg(f)
MEDIAN(f) median(f)
COUNT(f) count(f)
SUM(f) sum(f)
MIN(f) min(f)
MAX(f) max(f)
FIRST(f) argMin(f, time)
LAST(f) argMax(f, time)
PERCENTILE(f, N) quantile(N/100.0)(f)
SPREAD(f) (max(f) - min(f))
STDDEV(f) stddevPop(f)
MODE(f) topKWeighted(1)(f, 1)
DISTINCT(f) DISTINCT f

Transform function mapping (window functions)

InfluxQL ClickHouse SQL
DERIVATIVE(f, unit) (f - lagInFrame(f, 1) OVER (ORDER BY __time)) / nullIf(dateDiff('unit', lagInFrame(__time, 1) OVER ..., __time), 0) * scale
NON_NEGATIVE_DERIVATIVE(f, unit) Same as DERIVATIVE, wrapped in greatest(..., 0)
DIFFERENCE(f) f - lagInFrame(f, 1) OVER (ORDER BY __time)
NON_NEGATIVE_DIFFERENCE(f) greatest(f - lagInFrame(f, 1) OVER ..., 0)
MOVING_AVERAGE(f, N) avg(f) OVER (ORDER BY __time ROWS BETWEEN N-1 PRECEDING AND CURRENT ROW)
CUMULATIVE_SUM(f) sum(f) OVER (ORDER BY __time ROWS UNBOUNDED PRECEDING)
ELAPSED(f, unit) dateDiff('unit', lagInFrame(__time, 1) OVER ..., __time)

When transforms wrap aggregates (e.g., derivative(mean("value"), 1s)), the aggregate is first computed in a subquery, then the transform is applied as a window function in the outer query.

Time bucket translation

-- InfluxQL: GROUP BY time(5m)
-- ClickHouse:
toStartOfInterval(time, INTERVAL 5 MINUTE) AS __time

-- InfluxQL: GROUP BY time(1h, 15m)  (with offset)
-- ClickHouse:
toStartOfInterval(time - INTERVAL 15 MINUTE, INTERVAL 1 HOUR) + INTERVAL 15 MINUTE AS __time

The internal alias __time avoids collision with the raw time column. It is renamed back to time in the result parser.

Fill mode translation

Fill Mode ClickHouse Translation
fill(null) ORDER BY __time WITH FILL FROM ... TO ... STEP INTERVAL ...
fill(none) No WITH FILL clause
fill(0) or fill(N) ifNull(agg, N) + WITH FILL
fill(previous) WITH FILL + INTERPOLATE (col AS col)
fill(linear) WITH FILL + INTERPOLATE (col AS col USING LINEAR)

Regex operator translation

InfluxQL ClickHouse
"host" =~ /^us-.*/ match(host, '^us-.*')
"host" !~ /^eu-.*/ NOT match(host, '^eu-.*')

Time literal translation

Epoch timestamps with unit suffixes are translated to ClickHouse temporal functions:

InfluxQL ClickHouse
1609459200000000000 (bare integer) fromUnixTimestamp64Nano(1609459200000000000)
1609459200000ms fromUnixTimestamp64Milli(1609459200000)
now() now64()
now() - 1h now64() - INTERVAL 1 HOUR

9. Tombstone Injection

File: src/application/query_service.rs

inject_tombstone_predicates()

Before executing a SELECT, the query service loads all tombstones for the measurement:

let tombstones = metadata.list_tombstones(db, measurement).await?;

For each tombstone (id, predicate_sql), the generated WHERE clause is extended with:

AND NOT (predicate_sql)

This ensures deleted data is excluded at query time without modifying the underlying Parquet files.


10. chDB Execution

Files: src/adapters/chdb/query_adapter.rs, src/adapters/chdb/pool.rs

Session management

chDB sessions are Send but not Sync, so each is wrapped in Arc<Mutex<Session>>. All queries execute in spawn_blocking since chDB is synchronous.

Single session (ChdbQueryAdapter)

One Session instance wrapped in Arc<Mutex<Session>>. Every query contends on the same mutex.

Session pool (ChdbPool)

Multiple sessions in Vec<Arc<Mutex<Session>>>, selected round-robin via AtomicUsize. Each session has its own data directory ({base}/session_{i}).

Optional concurrency limit via Arc<Semaphore> (from max_concurrent_queries config). When set, queries acquire a semaphore permit before obtaining a session.

Query execution flow

  1. Acquire semaphore permit (if configured).
  2. Select session via round-robin.
  3. spawn_blocking:
  4. session.blocking_lock()
  5. session.execute(sql, OutputFormat::JSONEachRow)
  6. Return the UTF-8 result string.
  7. Handle file-related errors gracefully:
  8. CANNOT_EXTRACT_TABLE_STRUCTURE, no files, CANNOT_STAT -> treated as empty result (no data, not an error).
  9. Other errors -> HyperbytedbError::Chdb.

Output format

All queries use OutputFormat::JSONEachRow -- one JSON object per result row:

{"__time":"2024-01-15 10:00:00","host":"server01","mean_usage_idle":42.5}
{"__time":"2024-01-15 10:05:00","host":"server01","mean_usage_idle":38.2}

11. Result Parsing

File: src/application/query_service.rs

parse_json_each_row_to_series(raw, measurement, epoch, group_by_tags)

Transforms chDB's JSONEachRow output into InfluxDB v1 series format.

Steps

  1. Split the raw string by newlines.
  2. Parse each line as a JSON object.
  3. Rename __time to time.
  4. Convert ClickHouse datetime strings ("2024-01-15 10:00:00") to timestamps.
  5. Apply epoch parameter formatting:
  6. None or "" -> RFC3339 string ("2024-01-15T10:00:00Z")
  7. "ns" -> nanosecond integer
  8. "us" / "u" -> microsecond integer
  9. "ms" -> millisecond integer
  10. "s" -> second integer
  11. Group rows by tag combination:
  12. Extract tag values from GROUP BY tag columns.
  13. Rows with the same tag combination form one SeriesResult.
  14. Each SeriesResult contains:
  15. name: measurement name
  16. tags: HashMap<String, String> (tag key-value pairs)
  17. columns: list of column names
  18. values: 2D array of values

normalize_time_value(value, epoch)

Handles the conversion of various time representations: - ClickHouse datetime strings -> parsed to nanoseconds -> formatted per epoch - Integer timestamps -> scaled and formatted per epoch - Null times -> preserved as null


12. Chunked Execution

File: src/application/query_service.rs

When chunked execution is used

For large datasets with many Parquet files, scanning everything in a single chDB query can be slow. Chunked execution splits the time range into smaller windows and executes them in parallel.

Trigger conditions

  • file_count > CHUNK_FILE_THRESHOLD (50)
  • Time span exceeds compute_chunk_nanos(file_count) -- dynamically computed based on file density

Algorithm

  1. Split the [min_time, max_time] range into chunks of chunk_nanos each.
  2. For each chunk, build a separate ClickHouse SQL query scoped to that time sub-range.
  3. Execute all chunks in parallel via tokio::spawn.
  4. Concatenate JSON results from all chunks.
  5. Parse the concatenated JSON into series.
  6. Sort series values by time.

This approach exploits I/O parallelism since each chunk reads a different subset of Parquet files.


13. SLIMIT and SOFFSET

After all series are collected, SLIMIT and SOFFSET are applied for series-level pagination:

  • SOFFSET N -- skip the first N series
  • SLIMIT M -- return at most M series

This is applied after query execution since the number of series is only known after result parsing (each unique tag combination produces one series).


14. SELECT INTO

write_series_as_points()

When a SELECT statement includes an INTO clause (SELECT ... INTO target_meas ...), query results are written back to the database as new points:

  1. Execute the SELECT query normally.
  2. For each result series:
  3. Convert each row back to a Point with the target measurement name.
  4. Tags from GROUP BY are preserved.
  5. Field values come from the selected columns.
  6. Append all points to the WAL via wal.append(WalEntry { ... }).
  7. Return a result series showing the number of points written.

15. Cluster Query Behavior

File: src/application/peer_query_service.rs

PeerQueryService

In cluster mode, PeerQueryService wraps the local QueryServiceImpl. It adds Raft-based replication for schema-mutating statements.

Non-mutating queries (SELECT, SHOW)

Passed directly to the local QueryServiceImpl. Each node queries only its own Parquet files -- there is no distributed query fan-out or scatter-gather for data queries.

Mutating statements (CREATE, DROP, DELETE, etc.)

  1. Execute locally via inner.execute_query().
  2. If successful, replicate via replicate_mutation():
  3. If Raft is configured: route through raft.client_write(ClusterRequest::SchemaMutation(...)) for consensus.
  4. Otherwise: broadcast via peer_client.replicate_mutation().
  5. Replication errors are captured in the StatementResult.error field but do not prevent the local operation.

is_cluster_mutation(stmt) detection

Returns true for: CreateDatabase, DropDatabase, DropMeasurement, CreateRetentionPolicy, AlterRetentionPolicy, DropRetentionPolicy, CreateUser, DropUser, SetPassword, Delete, CreateContinuousQuery, DropContinuousQuery.


16. Metrics

Query metrics

Metric Type Description
hyperbytedb_query_requests_total counter Total query requests
hyperbytedb_query_errors_total counter Failed queries
hyperbytedb_query_duration_seconds histogram Query execution latency
hyperbytedb_write_requests_total counter Write requests (for SELECT INTO)