Files
tusk/src-tauri/src/commands/chat_tools.rs
Aleksey Shakhmatov c03e887b71
Some checks failed
CI / lint-and-build (push) Failing after 1m15s
fix(chat): decode pg_class.reltuples as f32 in sample_data probe
pg_class.reltuples is `real` (FLOAT4). Reading it as f64 via query_scalar made
the sample_data tool fail with a sqlx type-mismatch ("f64 (FLOAT8) is not
compatible with FLOAT4") before any rows were fetched. Decode as f32 and widen.
2026-05-23 18:17:55 +03:00

1260 lines
44 KiB
Rust

//! Chat agent tool handlers (chat v2).
//!
//! Each `*_tool` function returns a plain string formatted for direct injection
//! into the LLM tool-result history. They reuse the schema helpers in
//! `commands::ai` and `commands::schema` rather than re-implementing SQL.
use crate::commands::ai::{
fetch_column_comments, fetch_columns, fetch_enum_types, fetch_foreign_keys_raw,
fetch_gp_table_extras, fetch_table_comments, fetch_unique_constraints, format_table_block,
ColumnInfo,
};
use crate::commands::connections::{load_connection_config, switch_database_core};
use crate::commands::queries::execute_query_core;
use crate::commands::saved_queries::{list_saved_queries_core, save_query_core};
use crate::commands::schema::{list_databases_core, list_tables_core};
use crate::db::sql_guard::ensure_readonly_sql;
use crate::error::{TuskError, TuskResult};
use crate::models::saved_queries::SavedQuery;
use crate::state::{AppState, CachedVec, DbFlavor};
use crate::utils::escape_ident;
use sqlx::{PgPool, Row};
use std::collections::{BTreeMap, HashMap};
use std::time::{Duration, Instant};
use tauri::AppHandle;
const TOOL_CACHE_TTL: Duration = Duration::from_secs(300);
const MAX_TABLES_PER_GET_COLUMNS: usize = 20;
const COLUMNS_TOOL_OUTPUT_CAP: usize = 15_000;
// ---------------------------------------------------------------------------
// list_databases
// ---------------------------------------------------------------------------
pub async fn list_databases_tool(state: &AppState, connection_id: &str) -> TuskResult<String> {
let dbs = list_databases_core(state, connection_id).await?;
let active = active_db_name(state, connection_id).await;
let mut out = format!("DATABASES ({}):", dbs.len());
for db in &dbs {
if Some(db) == active.as_ref() {
out.push_str(&format!("\n * {} (active)", db));
} else {
out.push_str(&format!("\n {}", db));
}
}
Ok(out)
}
// ---------------------------------------------------------------------------
// list_tables
// ---------------------------------------------------------------------------
pub async fn list_tables_tool(
app: &AppHandle,
state: &AppState,
connection_id: &str,
db: Option<&str>,
) -> TuskResult<String> {
let active = active_db_name(state, connection_id).await;
let target = db.map(|s| s.to_string()).or_else(|| active.clone());
let target_name = match target.as_deref() {
Some(n) => n.to_string(),
None => return Err(TuskError::Custom("No active database selected.".into())),
};
let same_as_active = active.as_deref() == Some(target_name.as_str());
let flavor = state.get_flavor(connection_id).await;
let table_names = match (flavor, same_as_active) {
(DbFlavor::ClickHouse, _) => list_tables_clickhouse(state, connection_id, &target_name).await?,
(_, true) => list_tables_active_pg(state, connection_id).await?,
(_, false) => list_tables_other_pg(app, state, connection_id, &target_name).await?,
};
let header = if same_as_active {
format!("TABLES IN ACTIVE DATABASE `{}` ({}):", target_name, table_names.len())
} else {
format!("TABLES IN DATABASE `{}` ({}):", target_name, table_names.len())
};
let body: Vec<String> = table_names.iter().map(|t| format!(" {}", t)).collect();
Ok(format!("{}\n{}", header, body.join("\n")))
}
async fn list_tables_active_pg(state: &AppState, connection_id: &str) -> TuskResult<Vec<String>> {
let schemas = crate::commands::schema::list_schemas_core(state, connection_id).await?;
let mut all: Vec<String> = Vec::new();
for schema in &schemas {
let tables = list_tables_core(state, connection_id, schema).await?;
for t in tables {
all.push(format!("{}.{}", schema, t.name));
}
}
Ok(all)
}
async fn list_tables_other_pg(
app: &AppHandle,
state: &AppState,
connection_id: &str,
target_db: &str,
) -> TuskResult<Vec<String>> {
let cache_key = (connection_id.to_string(), target_db.to_string());
if let Some(hit) = state.tables_by_db_cache.read().await.get(&cache_key).cloned() {
if hit.cached_at.elapsed() < TOOL_CACHE_TTL {
return Ok(hit.value);
}
}
let config = load_connection_config(app, connection_id)?;
let url = config.connection_url_for_db(target_db);
let pool = PgPool::connect(&url).await.map_err(|e| {
TuskError::Custom(format!(
"Could not connect to database '{}' on this server: {}",
target_db, e
))
})?;
let rows = sqlx::query(
"SELECT table_schema, table_name FROM information_schema.tables \
WHERE table_schema NOT IN ('pg_catalog','information_schema','pg_toast','gp_toolkit') \
AND table_type = 'BASE TABLE' \
ORDER BY table_schema, table_name",
)
.fetch_all(&pool)
.await
.map_err(TuskError::Database)?;
pool.close().await;
let names: Vec<String> = rows
.iter()
.map(|r| format!("{}.{}", r.get::<String, _>(0), r.get::<String, _>(1)))
.collect();
state.tables_by_db_cache.write().await.insert(
cache_key,
CachedVec {
value: names.clone(),
cached_at: Instant::now(),
},
);
Ok(names)
}
async fn list_tables_clickhouse(
state: &AppState,
connection_id: &str,
target_db: &str,
) -> TuskResult<Vec<String>> {
let client = state.get_ch_client(connection_id).await?;
let escaped = target_db.replace('\\', "\\\\").replace('\'', "\\'");
let sql = format!(
"SELECT name FROM system.tables WHERE database = '{}' ORDER BY name",
escaped
);
let rows = client.fetch_objects(&sql).await?;
Ok(rows
.iter()
.filter_map(|r| r.get("name").and_then(|v| v.as_str()).map(String::from))
.collect())
}
// ---------------------------------------------------------------------------
// get_columns
// ---------------------------------------------------------------------------
pub async fn get_columns_tool(
state: &AppState,
connection_id: &str,
tables: &[String],
) -> TuskResult<String> {
if tables.is_empty() {
return Err(TuskError::Custom("get_columns requires at least one table.".into()));
}
if tables.len() > MAX_TABLES_PER_GET_COLUMNS {
return Err(TuskError::Custom(format!(
"Too many tables ({}); split into batches of ≤{}.",
tables.len(),
MAX_TABLES_PER_GET_COLUMNS
)));
}
let active_db = active_db_name(state, connection_id).await.unwrap_or_default();
// Normalise: accept "schema.table", "db.schema.table" (drop db if == active),
// and "table" (assume schema "public" for PG, or active DB for CH).
let parsed: Vec<(String, String, String)> = tables
.iter()
.map(|raw| normalise_table_ref(raw, &active_db))
.collect();
let flavor = state.get_flavor(connection_id).await;
if matches!(flavor, DbFlavor::ClickHouse) {
return get_columns_clickhouse(state, connection_id, &parsed).await;
}
get_columns_postgres(state, connection_id, &parsed).await
}
fn normalise_table_ref(raw: &str, active_db: &str) -> (String, String, String) {
// Returns (schema, table, original_input_for_diagnostics)
let trimmed = raw.trim().trim_matches('"').trim_matches('`');
let parts: Vec<&str> = trimmed.split('.').collect();
match parts.len() {
1 => ("public".to_string(), parts[0].to_string(), raw.to_string()),
2 => (parts[0].to_string(), parts[1].to_string(), raw.to_string()),
3 => {
// "db.schema.table" — drop db prefix when it matches active
let (db, schema, table) = (parts[0], parts[1], parts[2]);
if db == active_db {
(schema.to_string(), table.to_string(), raw.to_string())
} else {
// Different DB requested — let the caller surface a not-found warning.
// We still parse it as schema.table here.
(schema.to_string(), table.to_string(), raw.to_string())
}
}
_ => ("public".to_string(), trimmed.to_string(), raw.to_string()),
}
}
async fn get_columns_postgres(
state: &AppState,
connection_id: &str,
requested: &[(String, String, String)],
) -> TuskResult<String> {
let pool = state.get_pool(connection_id).await?;
let is_greenplum = matches!(state.get_flavor(connection_id).await, DbFlavor::Greenplum);
let gp_major = state.get_gp_major(connection_id).await.unwrap_or(7);
let (col_res, fk_res, enum_res, tbl_comm_res, col_comm_res, unique_res) = tokio::join!(
fetch_columns(&pool),
fetch_foreign_keys_raw(&pool),
fetch_enum_types(&pool),
fetch_table_comments(&pool),
fetch_column_comments(&pool),
fetch_unique_constraints(&pool),
);
let all_cols = col_res?;
let fk_rows = fk_res?;
let enum_map = enum_res.unwrap_or_default();
let tbl_comments = tbl_comm_res.unwrap_or_default();
let col_comments = col_comm_res.unwrap_or_default();
let uniques = unique_res.unwrap_or_default();
let gp_extras = if is_greenplum {
Some(fetch_gp_table_extras(&pool, gp_major).await)
} else {
None
};
// Build (schema, table) → Vec<ColumnInfo>
let mut by_table: BTreeMap<(String, String), Vec<ColumnInfo>> = BTreeMap::new();
for ci in &all_cols {
by_table
.entry((ci.schema.clone(), ci.table.clone()))
.or_default()
.push(ci.clone());
}
let mut fk_inline: HashMap<(String, String, String), String> = HashMap::new();
for fk in &fk_rows {
if fk.columns.len() == 1 && fk.ref_columns.len() == 1 {
fk_inline.insert(
(fk.schema.clone(), fk.table.clone(), fk.columns[0].clone()),
format!("{}.{}({})", fk.ref_schema, fk.ref_table, fk.ref_columns[0]),
);
}
}
let mut unique_map: HashMap<(String, String), Vec<String>> = HashMap::new();
for (schema, table, cols) in &uniques {
unique_map
.entry((schema.clone(), table.clone()))
.or_default()
.push(cols.join(", "));
}
let varchar_values: HashMap<(String, String, String), Vec<String>> = HashMap::new();
let jsonb_keys: HashMap<(String, String, String), Vec<String>> = HashMap::new();
let mut output: Vec<String> = Vec::new();
let mut not_found: Vec<String> = Vec::new();
for (schema, table, raw) in requested {
match by_table.get(&(schema.clone(), table.clone())) {
Some(cols) => {
let full_name = format!("{}.{}", schema, table);
format_table_block(
&full_name,
cols,
&tbl_comments,
&col_comments,
&fk_inline,
&enum_map,
&unique_map,
&varchar_values,
&jsonb_keys,
gp_extras.as_ref(),
&mut output,
);
}
None => not_found.push(raw.clone()),
}
}
if !not_found.is_empty() {
let nearest = nearest_table_matches(&by_table, &not_found);
let header = format!(
"WARNING: tables not found: {}.{}",
not_found.join(", "),
if nearest.is_empty() {
String::new()
} else {
format!(" Nearest matches: {}.", nearest.join(", "))
}
);
output.insert(0, header);
output.insert(1, String::new());
}
let mut text = output.join("\n");
if text.len() > COLUMNS_TOOL_OUTPUT_CAP {
text.truncate(COLUMNS_TOOL_OUTPUT_CAP);
text.push_str("\n... (output truncated)");
}
Ok(text)
}
async fn get_columns_clickhouse(
state: &AppState,
connection_id: &str,
requested: &[(String, String, String)],
) -> TuskResult<String> {
let client = state.get_ch_client(connection_id).await?;
let active_db = client.database.clone();
let where_terms: Vec<String> = requested
.iter()
.map(|(schema, table, _)| {
// For CH, treat the parsed "schema" as the database name; if it equals
// a PG-conventional default ("public"), substitute with active CH database.
let dbn = if schema == "public" { active_db.clone() } else { schema.clone() };
format!(
"(database = '{}' AND name = '{}')",
dbn.replace('\'', "\\'"),
table.replace('\'', "\\'")
)
})
.collect();
let where_clause = where_terms.join(" OR ");
let sql = format!(
"SELECT database, table, name, type, default_expression, is_in_primary_key, comment, position \
FROM system.columns WHERE {} ORDER BY database, table, position",
where_clause
);
let rows = client.fetch_objects(&sql).await?;
// Group by (database, table)
let mut grouped: BTreeMap<(String, String), Vec<&serde_json::Map<String, serde_json::Value>>> =
BTreeMap::new();
for row in &rows {
let dbn = row.get("database").and_then(|v| v.as_str()).unwrap_or("").to_string();
let tbl = row.get("table").and_then(|v| v.as_str()).unwrap_or("").to_string();
grouped.entry((dbn, tbl)).or_default().push(row);
}
// Track which requested tables were found
let mut output = String::new();
let mut not_found: Vec<String> = Vec::new();
for (schema, table, raw) in requested {
let dbn = if schema == "public" { active_db.clone() } else { schema.clone() };
match grouped.get(&(dbn.clone(), table.clone())) {
Some(cols) => {
output.push_str(&format!("\nTABLE {}.{}\n", dbn, table));
for col in cols {
let name = col.get("name").and_then(|v| v.as_str()).unwrap_or("");
let dtype = col.get("type").and_then(|v| v.as_str()).unwrap_or("");
let is_pk = matches!(
col.get("is_in_primary_key"),
Some(serde_json::Value::Number(n)) if n.as_i64() == Some(1)
) || matches!(
col.get("is_in_primary_key"),
Some(serde_json::Value::String(s)) if s == "1"
);
let default = col.get("default_expression").and_then(|v| v.as_str()).unwrap_or("");
let comment = col.get("comment").and_then(|v| v.as_str()).unwrap_or("");
let mut line = format!(" {} {}", name, dtype);
if is_pk {
line.push_str(" [PK]");
}
if !default.is_empty() {
line.push_str(&format!(" DEFAULT {}", default));
}
if !comment.is_empty() {
line.push_str(&format!(" -- {}", comment));
}
output.push_str(&line);
output.push('\n');
}
}
None => not_found.push(raw.clone()),
}
}
let mut header = String::new();
if !not_found.is_empty() {
header.push_str(&format!(
"WARNING: tables not found: {}\n\n",
not_found.join(", ")
));
}
let mut combined = format!("{}{}", header, output.trim_start());
if combined.len() > COLUMNS_TOOL_OUTPUT_CAP {
combined.truncate(COLUMNS_TOOL_OUTPUT_CAP);
combined.push_str("\n... (output truncated)");
}
Ok(combined)
}
fn nearest_table_matches(
by_table: &BTreeMap<(String, String), Vec<ColumnInfo>>,
missing: &[String],
) -> Vec<String> {
let all: Vec<String> = by_table
.keys()
.map(|(s, t)| format!("{}.{}", s, t))
.collect();
let mut hints: Vec<String> = Vec::new();
for m in missing {
let needle = m.to_lowercase();
let mut candidates: Vec<&String> = all
.iter()
.filter(|n| {
let lower = n.to_lowercase();
lower.contains(&needle) || needle.contains(lower.split('.').last().unwrap_or(""))
})
.take(3)
.collect();
candidates.dedup();
for c in candidates {
if !hints.contains(c) {
hints.push(c.clone());
}
}
}
hints
}
// ---------------------------------------------------------------------------
// switch_database
// ---------------------------------------------------------------------------
pub async fn switch_database_tool(
app: &AppHandle,
state: &AppState,
connection_id: &str,
target_db: &str,
) -> TuskResult<String> {
let config = load_connection_config(app, connection_id)?;
// Verify target exists in cluster
let dbs = list_databases_core(state, connection_id).await?;
if !dbs.iter().any(|d| d == target_db) {
return Err(TuskError::Custom(format!(
"Database '{}' does not exist on this server. Available: {}",
target_db,
dbs.join(", ")
)));
}
switch_database_core(state, &config, target_db).await?;
Ok(format!("Switched active database to '{}'.", target_db))
}
// ---------------------------------------------------------------------------
// helpers
// ---------------------------------------------------------------------------
async fn active_db_name(state: &AppState, connection_id: &str) -> Option<String> {
let flavor = state.get_flavor(connection_id).await;
if matches!(flavor, DbFlavor::ClickHouse) {
return state
.get_ch_client(connection_id)
.await
.ok()
.map(|c| c.database.clone());
}
let pool = state.get_pool(connection_id).await.ok()?;
sqlx::query_scalar::<_, String>("SELECT current_database()")
.fetch_one(&pool)
.await
.ok()
}
// ---------------------------------------------------------------------------
// save_query / find_queries (chat v3 — F2)
// ---------------------------------------------------------------------------
const FIND_QUERIES_LIMIT: usize = 10;
const FIND_QUERIES_SQL_PREVIEW_CHARS: usize = 500;
pub async fn save_query_tool(
app: &AppHandle,
connection_id: &str,
name: &str,
sql: &str,
) -> TuskResult<String> {
let trimmed_name = name.trim();
let trimmed_sql = sql.trim();
if trimmed_name.is_empty() {
return Err(TuskError::Custom("save_query: name must not be empty".into()));
}
if trimmed_sql.is_empty() {
return Err(TuskError::Custom("save_query: sql must not be empty".into()));
}
let entry = SavedQuery {
id: uuid::Uuid::new_v4().to_string(),
name: trimmed_name.to_string(),
sql: trimmed_sql.to_string(),
connection_id: Some(connection_id.to_string()),
created_at: chrono::Utc::now().to_rfc3339(),
};
save_query_core(app, entry).await?;
Ok(format!("Saved query \"{}\" — visible in sidebar → Saved.", trimmed_name))
}
pub async fn find_queries_tool(
app: &AppHandle,
connection_id: &str,
text: &str,
) -> TuskResult<String> {
let trimmed = text.trim();
if trimmed.is_empty() {
return Err(TuskError::Custom("find_queries: text must not be empty".into()));
}
let all = list_saved_queries_core(app, Some(trimmed)).await?;
let matches: Vec<SavedQuery> = all
.into_iter()
.filter(|q| q.connection_id.as_deref() == Some(connection_id))
.take(FIND_QUERIES_LIMIT)
.collect();
if matches.is_empty() {
return Ok(format!(
"No saved queries match \"{}\" for this connection.",
trimmed
));
}
let mut out = format!(
"Saved queries matching \"{}\" ({}):",
trimmed,
matches.len()
);
for q in &matches {
let sql_preview: String = if q.sql.chars().count() > FIND_QUERIES_SQL_PREVIEW_CHARS {
let truncated: String = q.sql.chars().take(FIND_QUERIES_SQL_PREVIEW_CHARS).collect();
format!("{}", truncated)
} else {
q.sql.clone()
};
out.push_str(&format!(
"\n\n[{}] {}\n{}",
q.created_at, q.name, sql_preview
));
}
Ok(out)
}
// ---------------------------------------------------------------------------
// profile_table (PR2 — data-engineering tool)
// ---------------------------------------------------------------------------
const PROFILE_TABLE_MAX_COLUMNS: usize = 30;
const PROFILE_TABLE_TOPK: usize = 5;
pub async fn profile_table_tool(
state: &AppState,
connection_id: &str,
table: &str,
) -> TuskResult<String> {
let active_db = active_db_name(state, connection_id).await.unwrap_or_default();
let (schema, tbl, _raw) = normalise_table_ref(table, &active_db);
let flavor = state.get_flavor(connection_id).await;
match flavor {
DbFlavor::PostgreSQL | DbFlavor::Greenplum => {
profile_table_postgres(state, connection_id, &schema, &tbl).await
}
DbFlavor::ClickHouse => profile_table_clickhouse(state, connection_id, &schema, &tbl).await,
}
}
async fn profile_table_postgres(
state: &AppState,
connection_id: &str,
schema: &str,
table: &str,
) -> TuskResult<String> {
let pool = state.get_pool(connection_id).await?;
let exists = sqlx::query_scalar::<_, i64>(
"SELECT 1 FROM pg_class c JOIN pg_namespace n ON c.relnamespace = n.oid \
WHERE n.nspname = $1 AND c.relname = $2 LIMIT 1",
)
.bind(schema)
.bind(table)
.fetch_optional(&pool)
.await
.map_err(TuskError::Database)?;
if exists.is_none() {
return Err(TuskError::Custom(format!(
"Table '{}.{}' does not exist (or no privileges).",
schema, table
)));
}
let last_analyze: Option<chrono::DateTime<chrono::Utc>> = sqlx::query_scalar(
"SELECT GREATEST(last_analyze, last_autoanalyze) FROM pg_stat_user_tables \
WHERE schemaname = $1 AND relname = $2",
)
.bind(schema)
.bind(table)
.fetch_optional(&pool)
.await
.ok()
.flatten();
let stat_rows = sqlx::query(
"SELECT attname, null_frac, n_distinct, \
most_common_vals::text, most_common_freqs, histogram_bounds::text \
FROM pg_stats \
WHERE schemaname = $1 AND tablename = $2 \
ORDER BY attname",
)
.bind(schema)
.bind(table)
.fetch_all(&pool)
.await
.map_err(TuskError::Database)?;
let mut out = format!("PROFILE {}.{}\n", schema, table);
match last_analyze {
Some(ts) => out.push_str(&format!("Last ANALYZE: {}\n", ts.to_rfc3339())),
None => out.push_str("Last ANALYZE: never\n"),
}
if stat_rows.is_empty() {
out.push_str(&format!(
"\nNo statistics in pg_stats. Run: ANALYZE {}.{};\n",
escape_ident(schema),
escape_ident(table)
));
return Ok(out);
}
let total = stat_rows.len();
let take = total.min(PROFILE_TABLE_MAX_COLUMNS);
out.push_str(&format!("\n{} columns with stats\n", total));
for r in stat_rows.iter().take(take) {
let attname: String = r.get(0);
let null_frac: f32 = r.try_get(1).unwrap_or(0.0);
let n_distinct: f32 = r.try_get(2).unwrap_or(0.0);
let mcv_text: Option<String> = r.try_get(3).ok();
let mcf_arr: Option<Vec<f32>> = r.try_get(4).ok();
let hist_text: Option<String> = r.try_get(5).ok();
out.push_str(&format!("\n {}:\n", attname));
out.push_str(&format!(" null_frac: {:.4}\n", null_frac));
if n_distinct < 0.0 {
out.push_str(&format!(
" n_distinct: {:.3} (ratio of total rows)\n",
-n_distinct
));
} else {
out.push_str(&format!(" n_distinct: {}\n", n_distinct as i64));
}
if let Some(text) = hist_text.as_deref() {
let bounds = parse_pg_array_text_local(text);
if let (Some(min), Some(max)) = (bounds.first(), bounds.last()) {
out.push_str(&format!(" range: {}{}\n", min, max));
}
}
if let Some(text) = mcv_text.as_deref() {
let vals = parse_pg_array_text_local(text);
if !vals.is_empty() {
let freqs = mcf_arr.unwrap_or_default();
let pairs: Vec<String> = vals
.iter()
.take(PROFILE_TABLE_TOPK)
.enumerate()
.map(|(i, v)| match freqs.get(i) {
Some(f) => format!("{}({:.3})", v, f),
None => v.clone(),
})
.collect();
out.push_str(&format!(" top: {}\n", pairs.join(", ")));
}
}
}
if total > take {
out.push_str(&format!("\n…and {} more columns\n", total - take));
}
Ok(out)
}
/// Local pg-array parser used by profile_table; mirrors `parse_pg_array_text` in ai.rs
/// but kept local to avoid importing a private helper.
fn parse_pg_array_text_local(s: &str) -> Vec<String> {
let s = s.trim();
let s = s.strip_prefix('{').unwrap_or(s);
let s = s.strip_suffix('}').unwrap_or(s);
if s.is_empty() {
return Vec::new();
}
let mut out = Vec::new();
let mut cur = String::new();
let mut in_quotes = false;
let mut chars = s.chars().peekable();
while let Some(c) = chars.next() {
match c {
'"' if !in_quotes => in_quotes = true,
'"' if in_quotes => {
if chars.peek() == Some(&'"') {
cur.push('"');
chars.next();
} else {
in_quotes = false;
}
}
',' if !in_quotes => {
out.push(std::mem::take(&mut cur));
}
'\\' if in_quotes => {
if let Some(next) = chars.next() {
cur.push(next);
}
}
other => cur.push(other),
}
}
if !cur.is_empty() || s.ends_with(',') {
out.push(cur);
}
out
}
async fn profile_table_clickhouse(
state: &AppState,
connection_id: &str,
schema: &str,
table: &str,
) -> TuskResult<String> {
let client = state.get_ch_client(connection_id).await?;
let active_db = client.database.clone();
let dbn = if schema == "public" || schema.is_empty() {
active_db
} else {
schema.to_string()
};
let cols_sql = format!(
"SELECT name, type FROM system.columns \
WHERE database = '{}' AND table = '{}' \
ORDER BY position LIMIT {}",
dbn.replace('\'', "\\'"),
table.replace('\'', "\\'"),
PROFILE_TABLE_MAX_COLUMNS
);
let col_rows = client.fetch_objects(&cols_sql).await?;
if col_rows.is_empty() {
return Err(TuskError::Custom(format!(
"Table '{}.{}' does not exist (or no privileges).",
dbn, table
)));
}
let mut select_parts: Vec<String> = vec!["count() AS rows_total".to_string()];
let mut col_names: Vec<String> = Vec::new();
let mut col_types: Vec<String> = Vec::new();
for r in &col_rows {
let name = r.get("name").and_then(|v| v.as_str()).unwrap_or("").to_string();
let dtype = r.get("type").and_then(|v| v.as_str()).unwrap_or("").to_string();
if name.is_empty() {
continue;
}
col_names.push(name.clone());
col_types.push(dtype);
let q = name.replace('`', "``");
select_parts.push(format!("countIf(`{}` IS NULL) AS null_{}", q, col_names.len()));
select_parts.push(format!("uniqHLL12(`{}`) AS dist_{}", q, col_names.len()));
select_parts.push(format!("toString(min(`{}`)) AS min_{}", q, col_names.len()));
select_parts.push(format!("toString(max(`{}`)) AS max_{}", q, col_names.len()));
select_parts.push(format!(
"arrayStringConcat(arrayMap(x -> toString(x), topK({})(`{}`)), '|') AS top_{}",
PROFILE_TABLE_TOPK,
q,
col_names.len()
));
}
let agg_sql = format!(
"SELECT {} FROM `{}`.`{}`",
select_parts.join(", "),
dbn.replace('`', "``"),
table.replace('`', "``")
);
let agg_rows = client.fetch_objects(&agg_sql).await?;
let row = agg_rows
.first()
.ok_or_else(|| TuskError::Custom("ClickHouse returned no row for profile aggregate".into()))?;
let rows_total = row
.get("rows_total")
.and_then(|v| v.as_str().and_then(|s| s.parse::<i64>().ok()).or_else(|| v.as_i64()))
.unwrap_or(0);
let mut out = format!(
"PROFILE {}.{}\nRows: {}\n{} columns profiled\n",
dbn,
table,
rows_total,
col_names.len()
);
for (i, name) in col_names.iter().enumerate() {
let n = i + 1;
let nulls = row
.get(&format!("null_{}", n))
.and_then(|v| v.as_str().and_then(|s| s.parse::<i64>().ok()).or_else(|| v.as_i64()))
.unwrap_or(0);
let dist = row
.get(&format!("dist_{}", n))
.and_then(|v| v.as_str().and_then(|s| s.parse::<i64>().ok()).or_else(|| v.as_i64()))
.unwrap_or(0);
let min = row.get(&format!("min_{}", n)).and_then(|v| v.as_str()).unwrap_or("");
let max = row.get(&format!("max_{}", n)).and_then(|v| v.as_str()).unwrap_or("");
let top_raw = row.get(&format!("top_{}", n)).and_then(|v| v.as_str()).unwrap_or("");
out.push_str(&format!("\n {} ({}):\n", name, col_types[i]));
let null_frac = if rows_total > 0 {
nulls as f64 / rows_total as f64
} else {
0.0
};
out.push_str(&format!(" null_frac: {:.4}\n", null_frac));
out.push_str(&format!(" distinct (HLL): {}\n", dist));
if !min.is_empty() || !max.is_empty() {
out.push_str(&format!(" range: {}{}\n", min, max));
}
if !top_raw.is_empty() {
let top_vals: Vec<&str> = top_raw.split('|').take(PROFILE_TABLE_TOPK).collect();
out.push_str(&format!(" top: {}\n", top_vals.join(", ")));
}
}
if col_rows.len() == PROFILE_TABLE_MAX_COLUMNS {
out.push_str(&format!(
"\n…showing first {} columns\n",
PROFILE_TABLE_MAX_COLUMNS
));
}
Ok(out)
}
// ---------------------------------------------------------------------------
// sample_data (PR2 — returns SQL string; dispatch site runs it through
// execute_query_core so the QueryResult feeds the standard renderer)
// ---------------------------------------------------------------------------
pub async fn build_sample_sql(
state: &AppState,
connection_id: &str,
table: &str,
limit: u32,
) -> TuskResult<String> {
let active_db = active_db_name(state, connection_id).await.unwrap_or_default();
let (schema, tbl, _raw) = normalise_table_ref(table, &active_db);
let flavor = state.get_flavor(connection_id).await;
match flavor {
DbFlavor::PostgreSQL | DbFlavor::Greenplum => {
build_sample_sql_postgres(state, connection_id, &schema, &tbl, limit).await
}
DbFlavor::ClickHouse => {
build_sample_sql_clickhouse(state, connection_id, &schema, &tbl, limit).await
}
}
}
async fn build_sample_sql_postgres(
state: &AppState,
connection_id: &str,
schema: &str,
table: &str,
limit: u32,
) -> TuskResult<String> {
let pool = state.get_pool(connection_id).await?;
// pg_class.reltuples is `real` (FLOAT4); decode as f32 then widen — sqlx is
// strict and reading it directly as f64 fails with a type-mismatch error.
let reltuples: f64 = sqlx::query_scalar::<_, f32>(
"SELECT c.reltuples FROM pg_class c JOIN pg_namespace n ON c.relnamespace = n.oid \
WHERE n.nspname = $1 AND c.relname = $2",
)
.bind(schema)
.bind(table)
.fetch_optional(&pool)
.await
.map_err(TuskError::Database)?
.unwrap_or(0.0) as f64;
let qualified = format!("{}.{}", escape_ident(schema), escape_ident(table));
if reltuples > 0.0 {
let target = limit as f64 * 100.0 / reltuples;
let percent = target.clamp(0.01, 100.0);
Ok(format!(
"SELECT * FROM {} TABLESAMPLE BERNOULLI({:.4}) LIMIT {}",
qualified, percent, limit
))
} else {
Ok(format!(
"SELECT * FROM {} ORDER BY random() LIMIT {}",
qualified, limit
))
}
}
async fn build_sample_sql_clickhouse(
state: &AppState,
connection_id: &str,
schema: &str,
table: &str,
limit: u32,
) -> TuskResult<String> {
let client = state.get_ch_client(connection_id).await?;
let active_db = client.database.clone();
let dbn = if schema == "public" || schema.is_empty() {
active_db
} else {
schema.to_string()
};
let info_sql = format!(
"SELECT engine, sampling_key FROM system.tables \
WHERE database = '{}' AND name = '{}' LIMIT 1",
dbn.replace('\'', "\\'"),
table.replace('\'', "\\'")
);
let rows = client.fetch_objects(&info_sql).await.unwrap_or_default();
let (engine, sampling_key) = match rows.first() {
Some(r) => (
r.get("engine").and_then(|v| v.as_str()).unwrap_or("").to_string(),
r.get("sampling_key").and_then(|v| v.as_str()).unwrap_or("").to_string(),
),
None => (String::new(), String::new()),
};
let qualified = format!(
"`{}`.`{}`",
dbn.replace('`', "``"),
table.replace('`', "``")
);
if engine.starts_with("Merge") && !sampling_key.trim().is_empty() {
Ok(format!(
"SELECT * FROM {} SAMPLE 0.01 LIMIT {}",
qualified, limit
))
} else {
Ok(format!(
"SELECT * FROM {} ORDER BY rand() LIMIT {}",
qualified, limit
))
}
}
// ---------------------------------------------------------------------------
// explain_query (PR2)
// ---------------------------------------------------------------------------
pub async fn explain_query_tool(
state: &AppState,
connection_id: &str,
sql: &str,
) -> TuskResult<String> {
let trimmed = sql.trim();
if trimmed.is_empty() {
return Err(TuskError::Custom("explain_query: sql must not be empty".into()));
}
// Validate the user's statement BEFORE prefixing EXPLAIN so the error message
// references their SQL, not the wrapper. ensure_readonly_sql also rejects any
// forbidden keywords (INSERT/UPDATE/DELETE/...) even nested under EXPLAIN.
ensure_readonly_sql(trimmed).map_err(|e| TuskError::Custom(e.to_string()))?;
let flavor = state.get_flavor(connection_id).await;
match flavor {
DbFlavor::PostgreSQL | DbFlavor::Greenplum => {
explain_query_postgres(state, connection_id, trimmed).await
}
DbFlavor::ClickHouse => explain_query_clickhouse(state, connection_id, trimmed).await,
}
}
async fn explain_query_postgres(
state: &AppState,
connection_id: &str,
sql: &str,
) -> TuskResult<String> {
let pool = state.get_pool(connection_id).await?;
let plan_sql = format!("EXPLAIN (FORMAT JSON, ANALYZE, BUFFERS) {}", sql);
let mut tx = pool.begin().await.map_err(TuskError::Database)?;
sqlx::query("SET TRANSACTION READ ONLY")
.execute(&mut *tx)
.await
.map_err(TuskError::Database)?;
let row = sqlx::query(&plan_sql)
.fetch_one(&mut *tx)
.await
.map_err(TuskError::Database)?;
let _ = tx.rollback().await;
let raw_json: serde_json::Value = match row.try_get::<serde_json::Value, _>(0) {
Ok(v) => v,
Err(_) => {
let s: String = row.try_get(0).map_err(TuskError::Database)?;
serde_json::from_str(&s)
.map_err(|e| TuskError::Custom(format!("EXPLAIN JSON parse failed: {}", e)))?
}
};
let plans = raw_json
.as_array()
.ok_or_else(|| TuskError::Custom("EXPLAIN JSON: expected array".into()))?;
let plan = plans.first().and_then(|p| p.get("Plan")).ok_or_else(|| {
TuskError::Custom("EXPLAIN JSON: missing top-level Plan node".into())
})?;
let root_node = plan.get("Node Type").and_then(|v| v.as_str()).unwrap_or("?");
let total_cost = plan.get("Total Cost").and_then(|v| v.as_f64()).unwrap_or(0.0);
let planning = plans
.first()
.and_then(|p| p.get("Planning Time").and_then(|v| v.as_f64()))
.unwrap_or(0.0);
let execution = plans
.first()
.and_then(|p| p.get("Execution Time").and_then(|v| v.as_f64()))
.unwrap_or(0.0);
let mut seq_scans: Vec<String> = Vec::new();
let mut spilled: Vec<String> = Vec::new();
let mut motions: Vec<String> = Vec::new();
let mut max_skew: Option<(f64, String)> = None;
walk_pg_plan(plan, &mut seq_scans, &mut spilled, &mut motions, &mut max_skew);
let mut out = format!(
"PLAN root: {}, total cost {:.1}\nPlanning: {:.2} ms Execution: {:.2} ms\n",
root_node, total_cost, planning, execution
);
if !seq_scans.is_empty() {
out.push_str(&format!("Seq scans on: {}\n", seq_scans.join(", ")));
}
if !spilled.is_empty() {
out.push_str(&format!("Spilled to disk: {}\n", spilled.join(", ")));
}
if !motions.is_empty() {
out.push_str(&format!("Motions (Greenplum): {}\n", motions.join(", ")));
}
if let Some((ratio, node)) = max_skew {
if ratio >= 5.0 {
out.push_str(&format!(
"Estimate skew: max plan/actual ratio = {:.1} on {}\n",
ratio, node
));
}
}
if seq_scans.is_empty() && spilled.is_empty() && motions.is_empty() {
out.push_str("No obvious red flags.\n");
}
Ok(out)
}
fn walk_pg_plan(
node: &serde_json::Value,
seq_scans: &mut Vec<String>,
spilled: &mut Vec<String>,
motions: &mut Vec<String>,
max_skew: &mut Option<(f64, String)>,
) {
let node_type = node.get("Node Type").and_then(|v| v.as_str()).unwrap_or("");
if node_type == "Seq Scan" {
let rel = node
.get("Relation Name")
.and_then(|v| v.as_str())
.unwrap_or("?");
let schema = node
.get("Schema")
.and_then(|v| v.as_str())
.map(|s| format!("{}.", s))
.unwrap_or_default();
seq_scans.push(format!("{}{}", schema, rel));
}
if let Some(method) = node.get("Sort Method").and_then(|v| v.as_str()) {
if method.contains("disk") || method.contains("external") {
spilled.push(format!("Sort ({})", method));
}
}
if node_type.contains("Motion") {
motions.push(node_type.to_string());
}
let plan_rows = node.get("Plan Rows").and_then(|v| v.as_f64()).unwrap_or(0.0);
let actual_rows = node.get("Actual Rows").and_then(|v| v.as_f64()).unwrap_or(0.0);
if actual_rows > 0.0 && plan_rows > 0.0 {
let ratio = (plan_rows / actual_rows).max(actual_rows / plan_rows);
if max_skew.as_ref().map(|(r, _)| ratio > *r).unwrap_or(true) {
*max_skew = Some((ratio, node_type.to_string()));
}
}
if let Some(children) = node.get("Plans").and_then(|v| v.as_array()) {
for child in children {
walk_pg_plan(child, seq_scans, spilled, motions, max_skew);
}
}
}
async fn explain_query_clickhouse(
state: &AppState,
connection_id: &str,
sql: &str,
) -> TuskResult<String> {
let client = state.get_ch_client(connection_id).await?;
let plan_sql = format!("EXPLAIN PLAN {}", sql);
let qr = client.execute_query(&plan_sql, true).await?;
if qr.rows.is_empty() {
return Ok("(empty plan)".to_string());
}
let mut out = String::from("ClickHouse plan:\n");
for row in &qr.rows {
if let Some(cell) = row.first() {
if let Some(s) = cell.as_str() {
out.push_str(s);
out.push('\n');
}
}
}
Ok(out)
}
// ---------------------------------------------------------------------------
// detect_skew (PR2 — Greenplum-only)
// ---------------------------------------------------------------------------
pub async fn detect_skew_tool(
state: &AppState,
connection_id: &str,
table: &str,
) -> TuskResult<String> {
let flavor = state.get_flavor(connection_id).await;
if !matches!(flavor, DbFlavor::Greenplum) {
return Ok("detect_skew is only available on Greenplum connections.".to_string());
}
let active_db = active_db_name(state, connection_id).await.unwrap_or_default();
let (schema, tbl, _raw) = normalise_table_ref(table, &active_db);
let qualified = format!("{}.{}", escape_ident(&schema), escape_ident(&tbl));
let sql = format!(
"SELECT gp_segment_id, COUNT(*) AS n FROM {} GROUP BY 1 ORDER BY 1",
qualified
);
let qr = execute_query_core(state, connection_id, &sql).await?;
let mut counts: Vec<(i64, i64)> = Vec::new();
for row in &qr.rows {
let seg = row
.get(0)
.and_then(|v| v.as_i64().or_else(|| v.as_str().and_then(|s| s.parse().ok())))
.unwrap_or(0);
let n = row
.get(1)
.and_then(|v| v.as_i64().or_else(|| v.as_str().and_then(|s| s.parse().ok())))
.unwrap_or(0);
counts.push((seg, n));
}
if counts.is_empty() {
return Ok(format!("Table {}.{} is empty.", schema, tbl));
}
let total: i64 = counts.iter().map(|(_, n)| *n).sum();
let max = counts.iter().map(|(_, n)| *n).max().unwrap_or(0);
let min = counts.iter().map(|(_, n)| *n).min().unwrap_or(0);
let avg = total as f64 / counts.len() as f64;
let ratio = if avg > 0.0 { max as f64 / avg } else { 0.0 };
let mut out = format!(
"Per-segment row distribution for {}.{}\nsegments: {} total rows: {}\nmin: {} max: {} avg: {:.0}\nskew ratio (max/avg): {:.2}",
schema,
tbl,
counts.len(),
total,
min,
max,
avg,
ratio
);
if ratio > 1.5 {
out.push_str(" ⚠ uneven distribution\n");
} else {
out.push_str(" OK — within 1.5x of average\n");
}
let pool = state.get_pool(connection_id).await?;
if let Some(policy) = fetch_gp_distribution_for(&pool, &schema, &tbl).await {
out.push_str(&format!("\nCurrent policy: {}\n", policy));
if ratio > 1.5 {
out.push_str(
"Hint: pick a higher-cardinality column. Run profile_table to compare n_distinct.\n",
);
}
}
Ok(out)
}
/// Fetch the Greenplum DISTRIBUTED BY policy for a single table. Returns None if
/// the catalog query fails (non-GP connection, missing privileges, etc.).
async fn fetch_gp_distribution_for(
pool: &PgPool,
schema: &str,
table: &str,
) -> Option<String> {
let row = sqlx::query(
"SELECT COALESCE(\
(SELECT array_agg(a.attname ORDER BY ord.idx) \
FROM regexp_split_to_table(NULLIF(trim(p.distkey::text), ''), ' ') \
WITH ORDINALITY AS ord(attnum_str, idx) \
JOIN pg_attribute a \
ON a.attrelid = c.oid \
AND a.attnum::int = ord.attnum_str::int), \
ARRAY[]::text[] \
) AS dist_columns \
FROM gp_distribution_policy p \
JOIN pg_class c ON p.localoid = c.oid \
JOIN pg_namespace n ON c.relnamespace = n.oid \
WHERE n.nspname = $1 AND c.relname = $2",
)
.bind(schema)
.bind(table)
.fetch_optional(pool)
.await
.ok()
.flatten()?;
let cols: Vec<String> = row.try_get(0).ok()?;
Some(if cols.is_empty() {
"DISTRIBUTED RANDOMLY".to_string()
} else {
format!("DISTRIBUTED BY ({})", cols.join(", "))
})
}