use crate::error::{TuskError, TuskResult}; use crate::models::ai::{ AiProvider, AiSettings, OllamaChatMessage, OllamaChatRequest, OllamaChatResponse, OllamaModel, OllamaTagsResponse, }; use crate::state::{AppState, DbFlavor}; use sqlx::Row; use std::collections::{BTreeMap, HashMap}; use std::fs; use std::sync::Arc; use std::time::Duration; use tauri::{AppHandle, Manager, State}; const MAX_RETRIES: u32 = 2; const RETRY_DELAY_MS: u64 = 1000; fn http_client() -> &'static reqwest::Client { use std::sync::LazyLock; static CLIENT: LazyLock = LazyLock::new(|| { reqwest::Client::builder() .connect_timeout(Duration::from_secs(5)) .timeout(Duration::from_secs(300)) .build() .unwrap_or_default() }); &CLIENT } fn get_ai_settings_path(app: &AppHandle) -> TuskResult { let dir = app .path() .app_data_dir() .map_err(|e| TuskError::Config(e.to_string()))?; fs::create_dir_all(&dir)?; Ok(dir.join("ai_settings.json")) } #[tauri::command] pub async fn get_ai_settings(app: AppHandle) -> TuskResult { let path = get_ai_settings_path(&app)?; if !path.exists() { return Ok(AiSettings::default()); } let data = fs::read_to_string(&path)?; let settings: AiSettings = serde_json::from_str(&data)?; Ok(settings) } #[tauri::command] pub async fn save_ai_settings( app: AppHandle, state: State<'_, Arc>, settings: AiSettings, ) -> TuskResult<()> { let path = get_ai_settings_path(&app)?; let data = serde_json::to_string_pretty(&settings)?; fs::write(&path, data)?; // Update in-memory cache *state.ai_settings.write().await = Some(settings); Ok(()) } #[tauri::command] pub async fn list_ollama_models(ollama_url: String) -> TuskResult> { let url = format!("{}/api/tags", ollama_url.trim_end_matches('/')); let resp = http_client().get(&url).send().await.map_err(|e| { TuskError::Ai(format!("Cannot connect to Ollama at {}: {}", ollama_url, e)) })?; if !resp.status().is_success() { let status = resp.status(); let body = resp.text().await.unwrap_or_default(); return Err(TuskError::Ai(format!( "Ollama error ({}): {}", status, body ))); } let tags: OllamaTagsResponse = resp .json() .await .map_err(|e| TuskError::Ai(format!("Failed to parse Ollama response: {}", e)))?; Ok(tags.models) } async fn call_ai_with_retry( _settings: &AiSettings, operation: &str, f: F, ) -> TuskResult where F: Fn() -> Fut, Fut: std::future::Future>, { let mut last_error = None; for attempt in 0..MAX_RETRIES { match f().await { Ok(result) => return Ok(result), Err(e) => { last_error = Some(e); if attempt < MAX_RETRIES - 1 { log::warn!( "{} failed (attempt {}/{}), retrying in {}ms...", operation, attempt + 1, MAX_RETRIES, RETRY_DELAY_MS ); tokio::time::sleep(Duration::from_millis(RETRY_DELAY_MS)).await; } } } } Err(last_error.unwrap_or_else(|| { TuskError::Ai(format!( "{} failed after {} attempts", operation, MAX_RETRIES )) })) } pub(crate) async fn load_ai_settings(app: &AppHandle, state: &AppState) -> TuskResult { // Try in-memory cache first if let Some(cached) = state.ai_settings.read().await.clone() { return Ok(cached); } // Fallback to disk let path = get_ai_settings_path(app)?; if !path.exists() { return Err(TuskError::Ai( "No AI model selected. Open AI settings to choose a model.".to_string(), )); } let data = fs::read_to_string(&path)?; let settings: AiSettings = serde_json::from_str(&data)?; // Populate cache for future calls *state.ai_settings.write().await = Some(settings.clone()); Ok(settings) } async fn call_ollama_chat( app: &AppHandle, state: &AppState, system_prompt: String, user_content: String, ) -> TuskResult { call_ollama_chat_messages( app, state, vec![ OllamaChatMessage { role: "system".to_string(), content: system_prompt, }, OllamaChatMessage { role: "user".to_string(), content: user_content, }, ], None, ) .await } pub(crate) async fn call_ollama_chat_messages( app: &AppHandle, state: &AppState, messages: Vec, format: Option, ) -> TuskResult { let settings = load_ai_settings(app, state).await?; if settings.model.is_empty() { return Err(TuskError::Ai( "No AI model selected. Open AI settings to choose a model.".to_string(), )); } if settings.provider != AiProvider::Ollama { return Err(TuskError::Ai(format!( "Provider {:?} not implemented yet", settings.provider ))); } let model = settings.model.clone(); let url = format!("{}/api/chat", settings.ollama_url.trim_end_matches('/')); let request = OllamaChatRequest { model: model.clone(), messages, stream: false, format, }; call_ai_with_retry(&settings, "Ollama request", || { let url = url.clone(); let request = request.clone(); async move { let resp = http_client() .post(&url) .json(&request) .send() .await .map_err(|e| { TuskError::Ai(format!("Cannot connect to Ollama at {}: {}", url, e)) })?; if !resp.status().is_success() { let status = resp.status(); let body = resp.text().await.unwrap_or_default(); return Err(TuskError::Ai(format!( "Ollama error ({}): {}", status, body ))); } let chat_resp: OllamaChatResponse = resp .json() .await .map_err(|e| TuskError::Ai(format!("Failed to parse Ollama response: {}", e)))?; Ok(chat_resp.message.content) } }) .await } // --------------------------------------------------------------------------- // SQL generation // --------------------------------------------------------------------------- #[tauri::command] pub async fn generate_sql( app: AppHandle, state: State<'_, Arc>, connection_id: String, prompt: String, ) -> TuskResult { let schema_text = build_schema_context(&state, &connection_id).await?; let system_prompt = format!( "You are an expert PostgreSQL query generator. You receive a database schema and a natural \ language request. Output ONLY a valid, executable PostgreSQL SQL query.\n\ \n\ OUTPUT FORMAT:\n\ - Raw SQL only. No explanations, no markdown code fences (```), no comments, no preamble.\n\ - The output must be directly executable in psql.\n\ - For complex queries use readable formatting with line breaks and indentation.\n\ \n\ CRITICAL RULES:\n\ 1. ONLY reference tables and columns that exist in the schema. Never invent names.\n\ 2. Use the FOREIGN KEY information to determine correct JOIN conditions.\n\ 3. Use LEFT JOIN when the FK column is nullable or the relationship is optional; \ INNER JOIN when both sides must exist.\n\ 4. Every non-aggregated column in SELECT must appear in GROUP BY.\n\ 5. Use COALESCE for nullable columns in aggregations: COALESCE(SUM(x), 0).\n\ 6. For enum columns, use ONLY the values listed in the ENUM TYPES section.\n\ 7. Use IS NULL / IS NOT NULL for null checks — never = NULL or != NULL.\n\ 8. Add LIMIT when the user asks for \"top N\", \"first N\", \"latest N\", etc.\n\ 9. Qualify column names with table alias when the query involves multiple tables.\n\ \n\ SEMANTIC RULES (very important):\n\ - When a table has both actual_* and planned_* columns (e.g. actual_start vs planned_start), \ they represent DIFFERENT concepts: planned = future estimate, actual = what really happened. \ NEVER mix them with COALESCE unless the user explicitly requests a fallback.\n\ - For time-based calculations involving real events (\"how long did X take\", \"average time between\"), \ use ONLY actual/factual timestamps (actual_*, started_at, completed_at, ended_at). \ Filter out NULL values with WHERE instead of falling back to planned timestamps.\n\ - Planned timestamps (planned_*, scheduled_*, estimated_*) should ONLY be used when the user \ asks about plans, schedules, SLA, or compares plan vs fact.\n\ - When computing durations or averages, always filter out rows where any involved timestamp \ is NULL rather than substituting with unrelated defaults.\n\ - Pay attention to column descriptions/comments in the schema — they reveal business semantics \ that are critical for correct queries.\n\ \n\ TYPE RULES:\n\ - timestamp - timestamp = interval. For seconds: EXTRACT(EPOCH FROM (ts1 - ts2)).\n\ - interval cannot be cast to numeric directly; use EXTRACT(EPOCH FROM interval).\n\ - UNION/UNION ALL requires matching column count and compatible types; cast enums to text.\n\ - Use ::type for PostgreSQL-style casts.\n\ - For array columns use ANY, ALL, @>, <@ operators.\n\ - For JSONB columns use ->, ->>, #>, jsonb_extract_path.\n\ \n\ COMMON PATTERNS:\n\ - FIRST/LAST per group: to find MIN(started_at) per trip, use \ \"SELECT trip_id, MIN(started_at) FROM t GROUP BY trip_id\". \ NEVER put the aggregated column (started_at) into GROUP BY — that defeats the aggregation \ and returns every row separately instead of one per group.\n\ - TOP-1 per group with extra columns: use DISTINCT ON (group_col) ... ORDER BY group_col, sort_col \ or a subquery with ROW_NUMBER() OVER (PARTITION BY group_col ORDER BY sort_col) = 1.\n\ - For \"time from A to B\" calculations, ensure both timestamps are NOT NULL with WHERE filters; \ never use COALESCE to mix planned and actual timestamps.\n\ \n\ BEST PRACTICES:\n\ - Use ILIKE for case-insensitive text search, LIKE for case-sensitive.\n\ - Use EXISTS instead of IN for subquery existence checks.\n\ - Use CTE (WITH ... AS) for complex multi-step logic.\n\ - Use window functions (ROW_NUMBER, RANK, LAG, LEAD, SUM OVER) for ranking and running totals.\n\ - Use date_trunc('period', column) for time-based grouping.\n\ - Use generate_series() for creating ranges.\n\ - Use string_agg(col, ', ') for concatenating grouped values.\n\ - Use FILTER (WHERE ...) for conditional aggregation instead of CASE inside aggregate.\n\ \n\ {}\n", schema_text ); let raw = call_ollama_chat(&app, &state, system_prompt, prompt).await?; Ok(clean_sql_response(&raw)) } // --------------------------------------------------------------------------- // SQL explanation // --------------------------------------------------------------------------- #[tauri::command] pub async fn explain_sql( app: AppHandle, state: State<'_, Arc>, connection_id: String, sql: String, ) -> TuskResult { let schema_text = build_schema_context(&state, &connection_id).await?; let system_prompt = format!( "You are a PostgreSQL expert. Explain the given SQL query clearly and concisely.\n\ \n\ Structure your explanation as:\n\ 1. **Summary** — one sentence describing what the query returns in business terms.\n\ 2. **Step-by-step breakdown** — explain tables accessed, joins, filters, aggregations, \ subqueries, and sorting. Use bullet points.\n\ 3. **Notes** — mention edge cases, potential issues, or performance concerns if any.\n\ \n\ Use the database schema below to understand table relationships and column meanings.\n\ Keep the explanation short; avoid restating the SQL verbatim.\n\ \n\ IMPORTANT: If you notice semantic issues (e.g. mixing planned_* and actual_* timestamps \ with COALESCE, comparing unrelated columns, missing NULL filters on nullable timestamps), \ mention them in the Notes section as potential problems.\n\ \n\ {}\n", schema_text ); call_ollama_chat(&app, &state, system_prompt, sql).await } // --------------------------------------------------------------------------- // SQL error fixing // --------------------------------------------------------------------------- #[tauri::command] pub async fn fix_sql_error( app: AppHandle, state: State<'_, Arc>, connection_id: String, sql: String, error_message: String, ) -> TuskResult { let schema_text = build_schema_context(&state, &connection_id).await?; let system_prompt = format!( "You are a PostgreSQL expert debugger. You receive a SQL query and the error it produced. \ Fix the query so it executes correctly.\n\ \n\ OUTPUT FORMAT:\n\ - Raw SQL only. No explanations, no markdown code fences (```), no comments.\n\ - The output must be directly executable.\n\ \n\ DIAGNOSTIC CHECKLIST:\n\ - Column/table does not exist → check the schema for correct names and spelling.\n\ - Column is ambiguous → qualify with table name or alias.\n\ - Must appear in GROUP BY → add missing non-aggregated columns to GROUP BY.\n\ - Type mismatch → add appropriate casts (::text, ::integer, etc.).\n\ - Permission denied → wrap in a read-only transaction if needed.\n\ - Syntax error → correct PostgreSQL syntax (check commas, parentheses, keywords).\n\ - Subquery returns more than one row → use IN, ANY, or add LIMIT 1.\n\ - Division by zero → wrap divisor with NULLIF(x, 0).\n\ \n\ ONLY use tables and columns from the schema below. Never invent names.\n\ Preserve the original intent of the query; change only what is necessary to fix the error.\n\ \n\ {}\n", schema_text ); let user_content = format!("SQL query:\n{}\n\nError message:\n{}", sql, error_message); let raw = call_ollama_chat(&app, &state, system_prompt, user_content).await?; Ok(clean_sql_response(&raw)) } // --------------------------------------------------------------------------- // Lite overview builder (chat v2) // --------------------------------------------------------------------------- const OVERVIEW_CACHE_TTL: std::time::Duration = std::time::Duration::from_secs(300); /// Build a compact overview of a connection's databases and active-DB tables. /// Designed to be small enough to inject in every system prompt. /// Cached per connection_id; invalidated by switch_database / disconnect. pub(crate) async fn build_overview_context( state: &AppState, connection_id: &str, ) -> TuskResult { if let Some(cached) = state.overview_cache.read().await.get(connection_id).cloned() { if cached.cached_at.elapsed() < OVERVIEW_CACHE_TTL { return Ok(cached.value); } } let flavor = state.get_flavor(connection_id).await; let text = match flavor { crate::state::DbFlavor::ClickHouse => build_overview_clickhouse(state, connection_id).await?, _ => build_overview_postgres(state, connection_id).await?, }; let mut cache = state.overview_cache.write().await; cache.insert( connection_id.to_string(), crate::state::CachedString { value: text.clone(), cached_at: std::time::Instant::now(), }, ); Ok(text) } async fn build_overview_postgres(state: &AppState, connection_id: &str) -> TuskResult { let pool = state.get_pool(connection_id).await?; let (version_res, current_db_res, dbs_res, tables_res) = tokio::join!( sqlx::query_scalar::<_, String>("SELECT version()").fetch_one(&pool), sqlx::query_scalar::<_, String>("SELECT current_database()").fetch_one(&pool), sqlx::query_scalar::<_, String>( "SELECT datname FROM pg_database \ WHERE datistemplate = false AND datallowconn = true \ ORDER BY datname" ) .fetch_all(&pool), sqlx::query_as::<_, (String, String)>( "SELECT table_schema, table_name FROM information_schema.tables \ WHERE table_schema NOT IN ('pg_catalog','information_schema','pg_toast','gp_toolkit') \ AND table_type = 'BASE TABLE' \ ORDER BY table_schema, table_name" ) .fetch_all(&pool), ); let version = version_res.unwrap_or_default(); let current_db = current_db_res.unwrap_or_default(); let all_dbs = dbs_res.unwrap_or_default(); let tables = tables_res.unwrap_or_default(); let short_version = version .split_whitespace() .take(2) .collect::>() .join(" "); let mut out = Vec::::new(); out.push(format!("DATABASE: {}", short_version)); if !current_db.is_empty() { out.push(format!("ACTIVE DATABASE: {}", current_db)); } out.push(String::new()); if !all_dbs.is_empty() { out.push("DATABASES ON THIS SERVER:".to_string()); for db in &all_dbs { if db == ¤t_db { out.push(format!(" * {} (active)", db)); } else { out.push(format!(" {}", db)); } } out.push(String::new()); } out.push(format!( "TABLES IN ACTIVE DATABASE ({}):", tables.len() )); for (schema, name) in &tables { out.push(format!(" {}.{}", schema, name)); } out.push(String::new()); out.push( "NOTE: Tables of other databases are not enumerated here. \ Call list_tables({\"database\":\"\"}) to see them. For PostgreSQL this also \ requires switch_database before you can actually query data there." .to_string(), ); Ok(out.join("\n")) } async fn build_overview_clickhouse(state: &AppState, connection_id: &str) -> TuskResult { let client = state.get_ch_client(connection_id).await?; let active_db = client.database.clone(); let dbs_rows = client .fetch_objects( "SELECT name FROM system.databases \ WHERE name NOT IN ('system','INFORMATION_SCHEMA','information_schema') \ ORDER BY name", ) .await .unwrap_or_default(); let table_rows = client .fetch_objects( "SELECT database, name FROM system.tables \ WHERE database NOT IN ('system','INFORMATION_SCHEMA','information_schema') \ ORDER BY database, name", ) .await .unwrap_or_default(); let version = client.ping().await.unwrap_or_default(); let mut out = Vec::::new(); out.push("DATABASE: ClickHouse".to_string()); if !version.is_empty() { out.push(format!("VERSION: {}", version.trim())); } out.push(format!("ACTIVE DATABASE: {}", active_db)); out.push(String::new()); if !dbs_rows.is_empty() { out.push("DATABASES ON THIS SERVER:".to_string()); for row in &dbs_rows { let name = row.get("name").and_then(|v| v.as_str()).unwrap_or(""); if name == active_db { out.push(format!(" * {} (active)", name)); } else { out.push(format!(" {}", name)); } } out.push(String::new()); } out.push(format!("TABLES ACROSS ALL DATABASES ({}):", table_rows.len())); for row in &table_rows { let dbn = row.get("database").and_then(|v| v.as_str()).unwrap_or(""); let tbl = row.get("name").and_then(|v| v.as_str()).unwrap_or(""); if !dbn.is_empty() && !tbl.is_empty() { out.push(format!(" {}.{}", dbn, tbl)); } } out.push(String::new()); out.push( "NOTE: ClickHouse allows fully-qualified `db.table` queries — \ you can reference any table in this list directly without switching databases." .to_string(), ); Ok(out.join("\n")) } // --------------------------------------------------------------------------- // Full schema context builder (legacy — used by generate_sql/explain_sql/fix_sql_error) // --------------------------------------------------------------------------- pub(crate) async fn build_schema_context( state: &AppState, connection_id: &str, ) -> TuskResult { // Check cache first if let Some(cached) = state.get_schema_cache(connection_id).await { return Ok(cached); } if matches!(state.get_flavor(connection_id).await, DbFlavor::ClickHouse) { return build_clickhouse_schema_context(state, connection_id).await; } let pool = state.get_pool(connection_id).await?; // Run all metadata queries in parallel for speed let ( version_res, current_db_res, all_dbs_res, col_res, fk_res, enum_res, tbl_comment_res, col_comment_res, unique_res, varchar_res, jsonb_res, ) = tokio::join!( sqlx::query_scalar::<_, String>("SELECT version()").fetch_one(&pool), sqlx::query_scalar::<_, String>("SELECT current_database()").fetch_one(&pool), sqlx::query_scalar::<_, String>( "SELECT datname FROM pg_database \ WHERE datistemplate = false AND datallowconn = true \ ORDER BY datname" ) .fetch_all(&pool), fetch_columns(&pool), fetch_foreign_keys_raw(&pool), fetch_enum_types(&pool), fetch_table_comments(&pool), fetch_column_comments(&pool), fetch_unique_constraints(&pool), fetch_varchar_values(&pool), fetch_jsonb_keys(&pool), ); let version = version_res.map_err(TuskError::Database)?; let current_db = current_db_res.unwrap_or_default(); let all_dbs = all_dbs_res.unwrap_or_default(); let col_rows = col_res?; let fk_rows = fk_res?; let enum_map = enum_res?; let tbl_comments = tbl_comment_res?; let col_comments = col_comment_res?; let unique_constraints = unique_res?; let varchar_values = varchar_res.unwrap_or_default(); let jsonb_keys = jsonb_res.unwrap_or_default(); // -- Build FK inline lookup: (schema, table, column) -> "ref_schema.ref_table(ref_col)" -- let mut fk_inline: HashMap<(String, String, String), String> = HashMap::new(); let mut fk_lines: Vec = Vec::new(); for fk in &fk_rows { let line = format!( "FK: {}.{}({}) -> {}.{}({})", fk.schema, fk.table, fk.columns.join(", "), fk.ref_schema, fk.ref_table, fk.ref_columns.join(", ") ); fk_lines.push(line); // For single-column FKs, enable inline annotation on column if fk.columns.len() == 1 && fk.ref_columns.len() == 1 { fk_inline.insert( (fk.schema.clone(), fk.table.clone(), fk.columns[0].clone()), format!("{}.{}({})", fk.ref_schema, fk.ref_table, fk.ref_columns[0]), ); } } // -- Build unique constraint lookup: (schema, table) -> Vec -- let mut unique_map: HashMap<(String, String), Vec> = HashMap::new(); for (schema, table, cols) in &unique_constraints { unique_map .entry((schema.clone(), table.clone())) .or_default() .push(cols.join(", ")); } // -- Format output -- let mut output: Vec = Vec::new(); // 1. PostgreSQL version (short form) let short_version = version .split_whitespace() .take(2) .collect::>() .join(" "); output.push(format!("DATABASE SCHEMA ({})", short_version)); if !current_db.is_empty() { output.push(format!("ACTIVE DATABASE: {}", current_db)); } output.push(String::new()); // 2. Cluster topology — other databases on this server. // Each PG database is isolated; cross-DB queries are not possible from a single connection. if all_dbs.len() > 1 { output.push("DATABASES ON THIS SERVER:".to_string()); for db in &all_dbs { if db == ¤t_db { output.push(format!(" * {} (active)", db)); } else { output.push(format!(" {}", db)); } } output.push(String::new()); output.push( "NOTE: Tables in other databases are NOT queryable from this session. \ If the user's question concerns data likely stored in a different database \ (e.g. an identity service in a separate DB), respond with a `final` message \ asking them to switch the active database via the connection selector." .to_string(), ); output.push(String::new()); } // 3. Quick table+column index for fast existence checks before writing SQL. // Each line lists `schema.table(col1, col2, ...)` so the model can grep both // table names and column names without scrolling through the full TABLES section. { let mut by_table: BTreeMap<(String, String), Vec> = BTreeMap::new(); for c in &col_rows { by_table .entry((c.schema.clone(), c.table.clone())) .or_default() .push(c.column.clone()); } if !by_table.is_empty() { output.push(format!( "TABLE INDEX (database `{}`, {} tables — table_name(column_list)):", current_db, by_table.len() )); for ((schema, table), cols) in &by_table { output.push(format!(" {}.{}({})", schema, table, cols.join(", "))); } output.push(String::new()); } } // 4. Enum types if !enum_map.is_empty() { output.push("ENUM TYPES:".to_string()); for (type_name, values) in &enum_map { let vals_str = values .iter() .map(|v| format!("'{}'", v)) .collect::>() .join(", "); output.push(format!(" {} = [{}]", type_name, vals_str)); } output.push(String::new()); } // 5. Tables with columns output.push("TABLES:".to_string()); // Group columns by schema.table preserving order let mut tables: BTreeMap> = BTreeMap::new(); for ci in &col_rows { let key = format!("{}.{}", ci.schema, ci.table); tables.entry(key).or_default().push(ci.clone()); } for (full_name, columns) in &tables { format_table_block( full_name, columns, &tbl_comments, &col_comments, &fk_inline, &enum_map, &unique_map, &varchar_values, &jsonb_keys, &mut output, ); } // 6. Foreign keys summary if !fk_lines.is_empty() { output.push(String::new()); output.push("FOREIGN KEYS:".to_string()); for fk in &fk_lines { output.push(format!(" {}", fk)); } } let result = output.join("\n"); // Cache the result state .set_schema_cache(connection_id.to_string(), result.clone()) .await; Ok(result) } // --------------------------------------------------------------------------- // Schema query helpers // --------------------------------------------------------------------------- #[derive(Clone)] pub(crate) struct ColumnInfo { pub(crate) schema: String, pub(crate) table: String, pub(crate) column: String, pub(crate) data_type: String, pub(crate) not_null: bool, pub(crate) is_pk: bool, pub(crate) column_default: Option, } /// Render a single table's column block in the human/LLM-readable schema format. /// Reused by both `build_schema_context` (full DDL for legacy AI commands) and /// the new `get_columns` chat tool. #[allow(clippy::too_many_arguments)] pub(crate) fn format_table_block( full_name: &str, columns: &[ColumnInfo], tbl_comments: &HashMap, col_comments: &HashMap<(String, String, String), String>, fk_inline: &HashMap<(String, String, String), String>, enum_map: &BTreeMap>, unique_map: &HashMap<(String, String), Vec>, varchar_values: &HashMap<(String, String, String), Vec>, jsonb_keys: &HashMap<(String, String, String), Vec>, output: &mut Vec, ) { let tbl_comment = tbl_comments.get(full_name).map(|c| c.as_str()); match tbl_comment { Some(comment) => output.push(format!("\nTABLE {} -- {}", full_name, comment)), None => output.push(format!("\nTABLE {}", full_name)), } for ci in columns { let mut parts: Vec = vec![ci.column.clone(), ci.data_type.clone()]; if ci.is_pk { parts.push("PK".to_string()); } if ci.not_null && !ci.is_pk { parts.push("NOT NULL".to_string()); } if let Some(ref_target) = fk_inline.get(&(ci.schema.clone(), ci.table.clone(), ci.column.clone())) { parts.push(format!("FK->{}", ref_target)); } if let Some(ref def) = ci.column_default { let simplified = simplify_default(def); if !simplified.is_empty() { parts.push(format!("DEFAULT {}", simplified)); } } let col_key = (ci.schema.clone(), ci.table.clone(), ci.column.clone()); let col_comment = col_comments.get(&col_key); let enum_annotation = enum_map.get(&ci.data_type); let mut suffix_parts: Vec = Vec::new(); if let Some(vals) = enum_annotation { let vals_str = vals .iter() .map(|v| format!("'{}'", v)) .collect::>() .join(", "); suffix_parts.push(format!("enum: {}", vals_str)); } if enum_annotation.is_none() { if let Some(vals) = varchar_values.get(&col_key) { let vals_str = vals .iter() .take(15) .map(|v| format!("'{}'", v)) .collect::>() .join(", "); suffix_parts.push(format!("values: {}", vals_str)); } } if let Some(keys) = jsonb_keys.get(&col_key) { suffix_parts.push(format!("json keys: {}", keys.join(", "))); } if let Some(comment) = col_comment { suffix_parts.push(comment.clone()); } if suffix_parts.is_empty() { output.push(format!(" {}", parts.join(" "))); } else { output.push(format!( " {} -- {}", parts.join(" "), suffix_parts.join("; ") )); } } // Unique constraints let schema_table: Vec<&str> = full_name.splitn(2, '.').collect(); if schema_table.len() == 2 { if let Some(uqs) = unique_map.get(&(schema_table[0].to_string(), schema_table[1].to_string())) { for uq_cols in uqs { output.push(format!(" UNIQUE({})", uq_cols)); } } } } async fn build_clickhouse_schema_context( state: &AppState, connection_id: &str, ) -> TuskResult { let client = state.get_ch_client(connection_id).await?; let db = client.database.clone(); // ClickHouse exposes ALL databases via system.* — pull cross-DB schema in one shot. let columns_sql = "SELECT database, table, name, type, is_in_primary_key \ FROM system.columns \ WHERE database NOT IN ('system','INFORMATION_SCHEMA','information_schema') \ ORDER BY database, table, position"; let dbs_sql = "SELECT name FROM system.databases \ WHERE name NOT IN ('system','INFORMATION_SCHEMA','information_schema') \ ORDER BY name"; let rows = client.fetch_objects(columns_sql).await?; let db_rows = client.fetch_objects(dbs_sql).await.unwrap_or_default(); let version = client.ping().await.unwrap_or_default(); let mut out = String::new(); out.push_str("DATABASE: ClickHouse\n"); if !version.is_empty() { out.push_str(&format!("VERSION: {}\n", version.trim())); } out.push_str(&format!("ACTIVE_DATABASE: {}\n\n", db)); // Cluster overview if !db_rows.is_empty() { out.push_str("DATABASES ON THIS SERVER:\n"); for row in &db_rows { let name = row.get("name").and_then(|v| v.as_str()).unwrap_or(""); if name == db { out.push_str(&format!(" * {} (active)\n", name)); } else { out.push_str(&format!(" {}\n", name)); } } out.push('\n'); } // Table+column index — same shape as the PG path so model has a uniform reference. { let mut by_table: BTreeMap<(String, String), Vec> = BTreeMap::new(); for r in &rows { let dbn = r.get("database").and_then(|v| v.as_str()).unwrap_or("").to_string(); let tbl = r.get("table").and_then(|v| v.as_str()).unwrap_or("").to_string(); let col = r.get("name").and_then(|v| v.as_str()).unwrap_or("").to_string(); if !dbn.is_empty() && !tbl.is_empty() && !col.is_empty() { by_table.entry((dbn, tbl)).or_default().push(col); } } if !by_table.is_empty() { out.push_str(&format!( "TABLE INDEX ({} tables across all databases — db.table(column_list)):\n", by_table.len() )); for ((dbn, tbl), cols) in &by_table { out.push_str(&format!(" {}.{}({})\n", dbn, tbl, cols.join(", "))); } out.push('\n'); } } out.push_str("TABLES:\n"); let mut current_key: Option = None; for row in &rows { let dbn = row.get("database").and_then(|v| v.as_str()).unwrap_or("").to_string(); let table = row.get("table").and_then(|v| v.as_str()).unwrap_or("").to_string(); let column = row.get("name").and_then(|v| v.as_str()).unwrap_or(""); let dtype = row.get("type").and_then(|v| v.as_str()).unwrap_or(""); let is_pk = matches!(row.get("is_in_primary_key"), Some(serde_json::Value::Number(n)) if n.as_i64() == Some(1)) || matches!(row.get("is_in_primary_key"), Some(serde_json::Value::String(s)) if s == "1"); let key = format!("{}.{}", dbn, table); if Some(&key) != current_key.as_ref() { out.push_str(&format!("\nTABLE {}.{}\n", dbn, table)); current_key = Some(key); } out.push_str(&format!( " {} {}{}\n", column, dtype, if is_pk { " [PK]" } else { "" } )); } out.push_str( "\nNOTES:\n\ - Use ClickHouse SQL dialect. Functions differ from PostgreSQL (e.g. count(), arrayJoin, toDate, formatDateTime).\n\ - ClickHouse allows fully-qualified `database.table` in queries — you CAN cross-reference databases on this server.\n\ - Read-only mode is enforced for the agent: only SELECT/WITH/EXPLAIN/SHOW/DESCRIBE allowed.\n\ - Always include LIMIT for ad-hoc SELECTs.\n", ); state .set_schema_cache(connection_id.to_string(), out.clone()) .await; Ok(out) } pub(crate) async fn fetch_columns(pool: &sqlx::PgPool) -> TuskResult> { let rows = sqlx::query( "SELECT \ c.table_schema, c.table_name, c.column_name, \ CASE \ WHEN c.data_type = 'USER-DEFINED' THEN c.udt_name \ WHEN c.data_type = 'ARRAY' THEN c.udt_name || '[]' \ ELSE c.data_type \ END AS data_type, \ c.is_nullable = 'NO' AS not_null, \ c.column_default, \ EXISTS( \ SELECT 1 FROM information_schema.table_constraints tc \ JOIN information_schema.key_column_usage kcu \ ON tc.constraint_name = kcu.constraint_name AND tc.table_schema = kcu.table_schema \ WHERE tc.constraint_type = 'PRIMARY KEY' \ AND tc.table_schema = c.table_schema \ AND tc.table_name = c.table_name \ AND kcu.column_name = c.column_name \ ) AS is_pk \ FROM information_schema.columns c \ WHERE c.table_schema NOT IN ('pg_catalog', 'information_schema', 'pg_toast', 'gp_toolkit') \ ORDER BY c.table_schema, c.table_name, c.ordinal_position", ) .fetch_all(pool) .await .map_err(TuskError::Database)?; Ok(rows .iter() .map(|r| ColumnInfo { schema: r.get(0), table: r.get(1), column: r.get(2), data_type: r.get(3), not_null: r.get(4), column_default: r.get(5), is_pk: r.get(6), }) .collect()) } pub(crate) struct ForeignKeyInfo { pub(crate) schema: String, pub(crate) table: String, pub(crate) columns: Vec, pub(crate) ref_schema: String, pub(crate) ref_table: String, pub(crate) ref_columns: Vec, } pub(crate) async fn fetch_foreign_keys_raw(pool: &sqlx::PgPool) -> TuskResult> { let rows = sqlx::query( "SELECT \ cn.nspname AS schema_name, cl.relname AS table_name, \ array_agg(DISTINCT a.attname ORDER BY a.attname) AS columns, \ cnf.nspname AS ref_schema, clf.relname AS ref_table, \ array_agg(DISTINCT af.attname ORDER BY af.attname) AS ref_columns \ FROM pg_constraint con \ JOIN pg_class cl ON con.conrelid = cl.oid \ JOIN pg_namespace cn ON cl.relnamespace = cn.oid \ JOIN pg_class clf ON con.confrelid = clf.oid \ JOIN pg_namespace cnf ON clf.relnamespace = cnf.oid \ JOIN pg_attribute a ON a.attrelid = con.conrelid AND a.attnum = ANY(con.conkey) \ JOIN pg_attribute af ON af.attrelid = con.confrelid AND af.attnum = ANY(con.confkey) \ WHERE con.contype = 'f' \ AND cn.nspname NOT IN ('pg_catalog','information_schema','pg_toast','gp_toolkit') \ GROUP BY cn.nspname, cl.relname, cnf.nspname, clf.relname, con.oid", ) .fetch_all(pool) .await .map_err(TuskError::Database)?; Ok(rows .iter() .map(|r| ForeignKeyInfo { schema: r.get(0), table: r.get(1), columns: r.get(2), ref_schema: r.get(3), ref_table: r.get(4), ref_columns: r.get(5), }) .collect()) } /// Returns BTreeMap> ordered by type name pub(crate) async fn fetch_enum_types(pool: &sqlx::PgPool) -> TuskResult>> { let rows = sqlx::query( "SELECT t.typname, \ array_agg(e.enumlabel ORDER BY e.enumsortorder) AS vals \ FROM pg_enum e \ JOIN pg_type t ON e.enumtypid = t.oid \ JOIN pg_namespace n ON t.typnamespace = n.oid \ WHERE n.nspname NOT IN ('pg_catalog', 'information_schema') \ GROUP BY t.typname \ ORDER BY t.typname", ) .fetch_all(pool) .await .map_err(TuskError::Database)?; let mut map = BTreeMap::new(); for r in &rows { let name: String = r.get(0); let vals: Vec = r.get(1); map.insert(name, vals); } Ok(map) } /// Returns HashMap<"schema.table", comment> pub(crate) async fn fetch_table_comments(pool: &sqlx::PgPool) -> TuskResult> { let rows = sqlx::query( "SELECT n.nspname, c.relname, obj_description(c.oid, 'pg_class') \ FROM pg_class c \ JOIN pg_namespace n ON c.relnamespace = n.oid \ WHERE c.relkind IN ('r', 'v', 'p', 'm') \ AND n.nspname NOT IN ('pg_catalog', 'information_schema', 'pg_toast', 'gp_toolkit') \ AND obj_description(c.oid, 'pg_class') IS NOT NULL", ) .fetch_all(pool) .await .map_err(TuskError::Database)?; let mut map = HashMap::new(); for r in &rows { let schema: String = r.get(0); let table: String = r.get(1); let comment: String = r.get(2); map.insert(format!("{}.{}", schema, table), comment); } Ok(map) } /// Returns HashMap<(schema, table, column), comment> pub(crate) async fn fetch_column_comments( pool: &sqlx::PgPool, ) -> TuskResult> { let rows = sqlx::query( "SELECT n.nspname, c.relname, a.attname, d.description \ FROM pg_description d \ JOIN pg_class c ON d.objoid = c.oid \ JOIN pg_namespace n ON c.relnamespace = n.oid \ JOIN pg_attribute a ON a.attrelid = c.oid AND a.attnum = d.objsubid \ WHERE d.objsubid > 0 \ AND n.nspname NOT IN ('pg_catalog', 'information_schema', 'pg_toast', 'gp_toolkit')", ) .fetch_all(pool) .await .map_err(TuskError::Database)?; let mut map = HashMap::new(); for r in &rows { let schema: String = r.get(0); let table: String = r.get(1); let column: String = r.get(2); let comment: String = r.get(3); map.insert((schema, table, column), comment); } Ok(map) } /// Returns Vec<(schema, table, Vec)> for UNIQUE constraints pub(crate) async fn fetch_unique_constraints( pool: &sqlx::PgPool, ) -> TuskResult)>> { let rows = sqlx::query( "SELECT n.nspname, cl.relname, \ array_agg(a.attname ORDER BY a.attnum) AS cols \ FROM pg_constraint con \ JOIN pg_class cl ON con.conrelid = cl.oid \ JOIN pg_namespace n ON cl.relnamespace = n.oid \ JOIN pg_attribute a ON a.attrelid = con.conrelid AND a.attnum = ANY(con.conkey) \ WHERE con.contype = 'u' \ AND n.nspname NOT IN ('pg_catalog', 'information_schema', 'pg_toast', 'gp_toolkit') \ GROUP BY n.nspname, cl.relname, con.oid \ ORDER BY n.nspname, cl.relname", ) .fetch_all(pool) .await .map_err(TuskError::Database)?; Ok(rows .iter() .map(|r| { let schema: String = r.get(0); let table: String = r.get(1); let cols: Vec = r.get(2); (schema, table, cols) }) .collect()) } /// Returns HashMap<(schema, table, column), Vec> for varchar columns /// with few distinct values (pseudo-enums), using pg_stats for zero-cost discovery. /// Returns None if pg_stats is not accessible (graceful degradation). async fn fetch_varchar_values( pool: &sqlx::PgPool, ) -> Option>> { let rows = match sqlx::query( "SELECT s.schemaname, s.tablename, s.attname, \ s.most_common_vals::text AS vals \ FROM pg_stats s \ JOIN information_schema.columns c \ ON c.table_schema = s.schemaname \ AND c.table_name = s.tablename \ AND c.column_name = s.attname \ WHERE s.schemaname NOT IN ('pg_catalog', 'information_schema', 'pg_toast', 'gp_toolkit') \ AND c.data_type = 'character varying' \ AND s.n_distinct > 0 AND s.n_distinct <= 20 \ AND s.most_common_vals IS NOT NULL \ ORDER BY s.schemaname, s.tablename, s.attname", ) .fetch_all(pool) .await { Ok(r) => r, Err(e) => { log::warn!("Failed to fetch varchar values from pg_stats: {}", e); return None; } }; let mut map = HashMap::new(); for r in &rows { let schema: String = r.get(0); let table: String = r.get(1); let column: String = r.get(2); let vals_text: String = r.get(3); let vals = parse_pg_array_text(&vals_text); if !vals.is_empty() { map.insert((schema, table, column), vals); } } Some(map) } /// Discovers top-level keys in JSONB columns by sampling actual data. /// Runs two sequential queries internally: first discovers JSONB columns, /// then samples keys from each via a single UNION ALL query. /// Returns None on error (graceful degradation). async fn fetch_jsonb_keys( pool: &sqlx::PgPool, ) -> Option>> { // Step 1: Find all JSONB columns let col_rows = match sqlx::query( "SELECT table_schema, table_name, column_name \ FROM information_schema.columns \ WHERE data_type = 'jsonb' \ AND table_schema NOT IN ('pg_catalog', 'information_schema', 'pg_toast', 'gp_toolkit') \ ORDER BY table_schema, table_name, column_name", ) .fetch_all(pool) .await { Ok(r) => r, Err(e) => { log::warn!("Failed to fetch JSONB columns: {}", e); return None; } }; if col_rows.is_empty() { return Some(HashMap::new()); } // Cap at 50 JSONB columns to prevent unbounded UNION ALL queries on large schemas let columns: Vec<(String, String, String)> = col_rows .iter() .take(50) .map(|r| { ( r.get::(0), r.get::(1), r.get::(2), ) }) .collect(); // Step 2: Build a single UNION ALL query to sample keys from all JSONB columns let parts: Vec = columns .iter() .enumerate() .map(|(i, (schema, table, col))| { let qs = schema.replace('"', "\"\""); let qt = table.replace('"', "\"\""); let qc = col.replace('"', "\"\""); format!( "(SELECT '{}.{}.{}' AS col_ref, key FROM (\ SELECT DISTINCT jsonb_object_keys(\"{}\") AS key \ FROM \"{}\".\"{}\" \ WHERE \"{}\" IS NOT NULL AND jsonb_typeof(\"{}\") = 'object' \ LIMIT 50\ ) sub{})", schema, table, col, qc, qs, qt, qc, qc, i ) }) .collect(); let query = parts.join(" UNION ALL "); let rows = match sqlx::query(&query).fetch_all(pool).await { Ok(r) => r, Err(e) => { log::warn!("Failed to fetch JSONB keys: {}", e); return None; } }; let mut map: HashMap<(String, String, String), Vec> = HashMap::new(); for r in &rows { let col_ref: String = r.get(0); let key: String = r.get(1); let ref_parts: Vec<&str> = col_ref.splitn(3, '.').collect(); if ref_parts.len() == 3 { let entry = map .entry(( ref_parts[0].to_string(), ref_parts[1].to_string(), ref_parts[2].to_string(), )) .or_default(); if !entry.contains(&key) { entry.push(key); } } } for vals in map.values_mut() { vals.sort(); } Some(map) } // --------------------------------------------------------------------------- // Helpers // --------------------------------------------------------------------------- /// Parses PostgreSQL text representation of arrays: {val1,val2,"val with comma"} fn parse_pg_array_text(s: &str) -> Vec { let s = s.trim(); let s = s.strip_prefix('{').unwrap_or(s); let s = s.strip_suffix('}').unwrap_or(s); if s.is_empty() { return Vec::new(); } let mut result = Vec::new(); let mut current = String::new(); let mut in_quotes = false; let mut chars = s.chars().peekable(); while let Some(ch) = chars.next() { match ch { '"' if !in_quotes => { in_quotes = true; } '"' if in_quotes => { if chars.peek() == Some(&'"') { current.push('"'); chars.next(); } else { in_quotes = false; } } ',' if !in_quotes => { result.push(current.trim().to_string()); current = String::new(); } _ => { current.push(ch); } } } if !current.is_empty() || !result.is_empty() { result.push(current.trim().to_string()); } result } fn simplify_default(raw: &str) -> String { let s = raw.trim(); if s.contains("nextval(") { return "auto-increment".to_string(); } // Shorten common defaults if s == "now()" || s == "CURRENT_TIMESTAMP" || s == "current_timestamp" { return "now()".to_string(); } if s == "true" || s == "false" { return s.to_string(); } // Numeric/string literals — keep short ones, skip very long generated defaults if s.len() > 50 { return String::new(); } s.to_string() } fn clean_sql_response(raw: &str) -> String { let trimmed = raw.trim(); // Remove markdown code fences let without_fences = if trimmed.starts_with("```") { let inner = trimmed .strip_prefix("```sql") .or_else(|| trimmed.strip_prefix("```SQL")) .or_else(|| trimmed.strip_prefix("```postgresql")) .or_else(|| trimmed.strip_prefix("```")) .unwrap_or(trimmed); inner.strip_suffix("```").unwrap_or(inner) } else { trimmed }; without_fences.trim().to_string() } #[cfg(test)] mod tests { use super::*; // ── clean_sql_response ──────────────────────────────────── #[test] fn clean_sql_plain() { assert_eq!(clean_sql_response("SELECT 1"), "SELECT 1"); } #[test] fn clean_sql_with_fences() { assert_eq!(clean_sql_response("```sql\nSELECT 1\n```"), "SELECT 1"); } #[test] fn clean_sql_with_generic_fences() { assert_eq!(clean_sql_response("```\nSELECT 1\n```"), "SELECT 1"); } #[test] fn clean_sql_with_postgresql_fences() { assert_eq!( clean_sql_response("```postgresql\nSELECT 1\n```"), "SELECT 1" ); } #[test] fn clean_sql_with_whitespace() { assert_eq!(clean_sql_response(" SELECT 1 "), "SELECT 1"); } #[test] fn clean_sql_no_fences_multiline() { assert_eq!( clean_sql_response("SELECT\n *\nFROM users"), "SELECT\n *\nFROM users" ); } }