Extension Points¶
Step-by-step guides for extending HyperbyteDB with new functionality.
Adding a New InfluxQL Statement¶
Step 1: Define the AST node¶
In src/influxql/ast.rs, add a variant to the Statement enum:
Step 2: Parse it¶
In src/influxql/parser.rs, add a dispatch case in parse_statement():
fn parse_statement(input: &str) -> Result<Statement, HyperbytedbError> {
let first = first_token(input);
match first.to_uppercase().as_str() {
"NEW" => parse_new_statement(input),
// existing cases...
}
}
Implement parse_new_statement() following the existing recursive descent patterns.
Step 3: Execute it¶
In src/application/query_service.rs, add a match arm in execute_statement():
Statement::NewStatement(data) => {
// Execute the statement logic using metadata, WAL, or other ports
// Return StatementResult
}
Step 4: Translate to ClickHouse (if SELECT-like)¶
If the statement involves data queries, add translation logic in src/influxql/to_clickhouse.rs.
Step 5: Handle cluster replication (if mutating)¶
If the statement modifies state: 1. Add it to is_cluster_mutation() in src/application/peer_query_service.rs. 2. Add a MutationRequest variant in src/cluster/types.rs. 3. Add application logic in src/cluster/raft/state_machine.rs.
Adding a New Storage Backend¶
Step 1: Implement StoragePort¶
Create a new file in src/adapters/storage/:
pub struct NewStorage { /* ... */ }
#[async_trait]
impl StoragePort for NewStorage {
async fn write_parquet(&self, path: &str, data: Bytes) -> Result<(), HyperbytedbError> { /* ... */ }
async fn read_parquet(&self, path: &str) -> Result<Bytes, HyperbytedbError> { /* ... */ }
async fn list_parquet_files(&self, prefix: &str) -> Result<Vec<String>, HyperbytedbError> { /* ... */ }
async fn delete_parquet(&self, path: &str) -> Result<(), HyperbytedbError> { /* ... */ }
async fn prepare_parquet_stream(&self, path: &str) -> Result<ParquetStreamSource, HyperbytedbError> { /* ... */ }
}
The path parameter is always relative. The backend adds its own prefix.
Step 2: Wire it in bootstrap¶
In src/bootstrap.rs, add a match arm:
let storage: Arc<dyn StoragePort> = match config.storage.backend.as_str() {
"s3" => Arc::new(S3Storage::new(&config.storage.s3.unwrap())?),
"new_backend" => Arc::new(NewStorage::new(&config)?),
_ => Arc::new(LocalStorage::new(&config.storage.data_dir)),
};
Step 3: Add config¶
Add config fields in src/config.rs and document in docs/user-guide/configuration.md.
Adding a New Background Service¶
Background services follow a consistent pattern. Use RetentionService as the simplest reference.
Step 1: Define the service¶
pub struct NewService {
metadata: Arc<dyn MetadataPort>,
}
impl NewService {
pub fn new(metadata: Arc<dyn MetadataPort>) -> Self {
Self { metadata }
}
pub async fn run(
&self,
interval: Duration,
mut shutdown_rx: watch::Receiver<bool>,
) {
let mut ticker = tokio::time::interval(interval);
ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
loop {
tokio::select! {
_ = ticker.tick() => {
if let Err(e) = self.do_work().await {
tracing::error!("new service error: {}", e);
counter!("hyperbytedb_new_service_errors_total").increment(1);
}
}
_ = shutdown_rx.changed() => {
if *shutdown_rx.borrow() {
tracing::info!("new service shutting down");
break;
}
}
}
}
}
async fn do_work(&self) -> Result<(), HyperbytedbError> {
// service logic here
Ok(())
}
}
Step 2: Wire in bootstrap and main¶
In src/bootstrap.rs or src/main.rs, create and spawn the service:
let new_svc = Arc::new(NewService::new(metadata.clone()));
let new_shutdown = shutdown_rx.clone();
let new_handle = tokio::spawn(async move {
new_svc.run(Duration::from_secs(30), new_shutdown).await;
});
// Await the handle during shutdown
Adding a New Port¶
Step 1: Define the trait¶
In src/ports/new_port.rs:
#[async_trait]
pub trait NewPort: Send + Sync {
async fn do_something(&self, input: &str) -> Result<Output, HyperbytedbError>;
}
Step 2: Register the module¶
Add pub mod new_port; to src/ports/mod.rs.
Step 3: Implement and wire¶
Create the adapter in src/adapters/ and wire it in src/bootstrap.rs.
Adding a New HTTP Endpoint¶
Step 1: Create the handler¶
In src/adapters/http/ (new file or existing):
pub async fn handle_new_endpoint(
State(state): State<Arc<AppState>>,
Query(params): Query<NewParams>,
) -> Result<impl IntoResponse, HyperbytedbError> {
// handler logic
Ok(Json(result))
}
Step 2: Register the route¶
In src/adapters/http/router.rs, add to build_router():
Step 3: Add any needed state¶
If the handler needs new shared state, add fields to AppState in router.rs and populate in bootstrap.rs.
Adding a New Adapter for an Existing Port¶
To replace or add an alternative implementation of an existing port:
- Create the implementation in
src/adapters/. - Implement the port trait.
- Add a config option to select the implementation.
- Wire it in
src/bootstrap.rswith a match on the config.
The system is designed so that swapping implementations requires changes only in bootstrap.rs and config — no business logic changes needed.
See Also¶
- Core Modules — Module reference
- Coding Standards — Code conventions to follow
- Contributing — Review process for changes