use crate::error::{TuskError, TuskResult}; use crate::models::query_result::QueryResult; use serde::Deserialize; use serde_json::{Map, Value}; use std::sync::LazyLock; use std::time::{Duration, Instant}; const DEFAULT_TIMEOUT: Duration = Duration::from_secs(120); fn http_client() -> &'static reqwest::Client { static CLIENT: LazyLock = LazyLock::new(|| { reqwest::Client::builder() .connect_timeout(Duration::from_secs(5)) .timeout(DEFAULT_TIMEOUT) .build() .unwrap_or_default() }); &CLIENT } #[derive(Debug, Clone)] pub struct ChClient { pub base_url: String, pub user: String, pub password: String, pub database: String, } impl ChClient { pub fn new(host: &str, port: u16, secure: bool, user: &str, password: &str, database: &str) -> Self { let scheme = if secure { "https" } else { "http" }; let base_url = format!("{}://{}:{}", scheme, host, port); Self { base_url, user: user.to_string(), password: password.to_string(), database: database.to_string(), } } fn endpoint(&self, database: Option<&str>, format: Option<&str>, read_only: bool) -> String { let db = database.unwrap_or(&self.database); let mut params = vec![ format!("database={}", urlencode(db)), format!("user={}", urlencode(&self.user)), ]; if !self.password.is_empty() { params.push(format!("password={}", urlencode(&self.password))); } if let Some(fmt) = format { params.push(format!("default_format={}", urlencode(fmt))); } if read_only { params.push("readonly=1".to_string()); } format!("{}/?{}", self.base_url, params.join("&")) } /// Execute SQL and return raw response body. pub async fn execute_raw(&self, sql: &str, format: Option<&str>, read_only: bool) -> TuskResult { let url = self.endpoint(None, format, read_only); let resp = http_client() .post(&url) .body(sql.to_string()) .send() .await .map_err(|e| TuskError::Custom(format!("ClickHouse request failed: {}", e)))?; let status = resp.status(); let body = resp .text() .await .map_err(|e| TuskError::Custom(format!("Failed to read ClickHouse response: {}", e)))?; if !status.is_success() { return Err(TuskError::Custom(format!( "ClickHouse error ({}): {}", status, body.trim() ))); } Ok(body) } /// Test connection by running `SELECT 1` and return the server version. pub async fn ping(&self) -> TuskResult { // Use raw FORMAT TabSeparated to fetch version let body = self.execute_raw("SELECT version()", Some("TabSeparated"), false).await?; Ok(body.trim().to_string()) } /// Execute SQL and parse rows via JSONCompact to preserve column metadata + types. pub async fn execute_query(&self, sql: &str, read_only: bool) -> TuskResult { let start = Instant::now(); let body = self.execute_raw(sql, Some("JSONCompact"), read_only).await?; let execution_time_ms = start.elapsed().as_millis() as u64; // Empty body for statements without result set (DDL etc.) — return zero rows if body.trim().is_empty() { return Ok(QueryResult { columns: vec![], types: vec![], rows: vec![], row_count: 0, execution_time_ms, }); } let parsed: ChJsonCompactResponse = serde_json::from_str(&body).map_err(|e| { TuskError::Custom(format!( "Failed to parse ClickHouse JSONCompact response: {} (body head: {})", e, body.chars().take(200).collect::() )) })?; let columns: Vec = parsed.meta.iter().map(|m| m.name.clone()).collect(); let types: Vec = parsed.meta.iter().map(|m| m.r#type.clone()).collect(); let row_count = parsed.data.len(); Ok(QueryResult { columns, types, rows: parsed.data, row_count, execution_time_ms, }) } /// Execute SQL expecting result rows as objects (for schema introspection helpers). pub async fn fetch_objects(&self, sql: &str) -> TuskResult>> { let body = self.execute_raw(sql, Some("JSONEachRow"), false).await?; let mut out = Vec::new(); for line in body.lines() { let line = line.trim(); if line.is_empty() { continue; } let value: Value = serde_json::from_str(line).map_err(|e| { TuskError::Custom(format!("Failed to parse JSONEachRow line: {}", e)) })?; if let Value::Object(obj) = value { out.push(obj); } } Ok(out) } } #[derive(Debug, Deserialize)] struct ChJsonCompactResponse { meta: Vec, data: Vec>, } #[derive(Debug, Deserialize)] struct ChMetaEntry { name: String, r#type: String, } fn urlencode(s: &str) -> String { s.chars() .map(|c| match c { ':' | '/' | '?' | '#' | '[' | ']' | '@' | '!' | '$' | '&' | '\'' | '(' | ')' | '*' | '+' | ',' | ';' | '=' | '%' | ' ' => format!("%{:02X}", c as u8), _ => c.to_string(), }) .collect() }