refactor(ai): consolidate AI around chat tool-calling; add OpenRouter
- rework chat backend (chat.rs, chat_tools.rs, ai.rs, models, state) around tool calls - add OpenRouter provider alongside Ollama/Fireworks in settings - drop inline AiBar, ResultsPanel explain/fix UI and ChartPreview in favour of the chat panel - add frontend chat tool-registry
This commit is contained in:
@@ -10,11 +10,14 @@ use crate::commands::ai::{
|
||||
ColumnInfo,
|
||||
};
|
||||
use crate::commands::connections::{load_connection_config, switch_database_core};
|
||||
use crate::commands::queries::execute_query_core;
|
||||
use crate::commands::saved_queries::{list_saved_queries_core, save_query_core};
|
||||
use crate::commands::schema::{list_databases_core, list_tables_core};
|
||||
use crate::db::sql_guard::ensure_readonly_sql;
|
||||
use crate::error::{TuskError, TuskResult};
|
||||
use crate::models::saved_queries::SavedQuery;
|
||||
use crate::state::{AppState, CachedVec, DbFlavor};
|
||||
use crate::utils::escape_ident;
|
||||
use sqlx::{PgPool, Row};
|
||||
use std::collections::{BTreeMap, HashMap};
|
||||
use std::time::{Duration, Instant};
|
||||
@@ -565,3 +568,690 @@ pub async fn find_queries_tool(
|
||||
Ok(out)
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// profile_table (PR2 — data-engineering tool)
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
const PROFILE_TABLE_MAX_COLUMNS: usize = 30;
|
||||
const PROFILE_TABLE_TOPK: usize = 5;
|
||||
|
||||
pub async fn profile_table_tool(
|
||||
state: &AppState,
|
||||
connection_id: &str,
|
||||
table: &str,
|
||||
) -> TuskResult<String> {
|
||||
let active_db = active_db_name(state, connection_id).await.unwrap_or_default();
|
||||
let (schema, tbl, _raw) = normalise_table_ref(table, &active_db);
|
||||
let flavor = state.get_flavor(connection_id).await;
|
||||
match flavor {
|
||||
DbFlavor::PostgreSQL | DbFlavor::Greenplum => {
|
||||
profile_table_postgres(state, connection_id, &schema, &tbl).await
|
||||
}
|
||||
DbFlavor::ClickHouse => profile_table_clickhouse(state, connection_id, &schema, &tbl).await,
|
||||
}
|
||||
}
|
||||
|
||||
async fn profile_table_postgres(
|
||||
state: &AppState,
|
||||
connection_id: &str,
|
||||
schema: &str,
|
||||
table: &str,
|
||||
) -> TuskResult<String> {
|
||||
let pool = state.get_pool(connection_id).await?;
|
||||
|
||||
let exists = sqlx::query_scalar::<_, i64>(
|
||||
"SELECT 1 FROM pg_class c JOIN pg_namespace n ON c.relnamespace = n.oid \
|
||||
WHERE n.nspname = $1 AND c.relname = $2 LIMIT 1",
|
||||
)
|
||||
.bind(schema)
|
||||
.bind(table)
|
||||
.fetch_optional(&pool)
|
||||
.await
|
||||
.map_err(TuskError::Database)?;
|
||||
if exists.is_none() {
|
||||
return Err(TuskError::Custom(format!(
|
||||
"Table '{}.{}' does not exist (or no privileges).",
|
||||
schema, table
|
||||
)));
|
||||
}
|
||||
|
||||
let last_analyze: Option<chrono::DateTime<chrono::Utc>> = sqlx::query_scalar(
|
||||
"SELECT GREATEST(last_analyze, last_autoanalyze) FROM pg_stat_user_tables \
|
||||
WHERE schemaname = $1 AND relname = $2",
|
||||
)
|
||||
.bind(schema)
|
||||
.bind(table)
|
||||
.fetch_optional(&pool)
|
||||
.await
|
||||
.ok()
|
||||
.flatten();
|
||||
|
||||
let stat_rows = sqlx::query(
|
||||
"SELECT attname, null_frac, n_distinct, \
|
||||
most_common_vals::text, most_common_freqs, histogram_bounds::text \
|
||||
FROM pg_stats \
|
||||
WHERE schemaname = $1 AND tablename = $2 \
|
||||
ORDER BY attname",
|
||||
)
|
||||
.bind(schema)
|
||||
.bind(table)
|
||||
.fetch_all(&pool)
|
||||
.await
|
||||
.map_err(TuskError::Database)?;
|
||||
|
||||
let mut out = format!("PROFILE {}.{}\n", schema, table);
|
||||
match last_analyze {
|
||||
Some(ts) => out.push_str(&format!("Last ANALYZE: {}\n", ts.to_rfc3339())),
|
||||
None => out.push_str("Last ANALYZE: never\n"),
|
||||
}
|
||||
|
||||
if stat_rows.is_empty() {
|
||||
out.push_str(&format!(
|
||||
"\nNo statistics in pg_stats. Run: ANALYZE {}.{};\n",
|
||||
escape_ident(schema),
|
||||
escape_ident(table)
|
||||
));
|
||||
return Ok(out);
|
||||
}
|
||||
|
||||
let total = stat_rows.len();
|
||||
let take = total.min(PROFILE_TABLE_MAX_COLUMNS);
|
||||
out.push_str(&format!("\n{} columns with stats\n", total));
|
||||
|
||||
for r in stat_rows.iter().take(take) {
|
||||
let attname: String = r.get(0);
|
||||
let null_frac: f32 = r.try_get(1).unwrap_or(0.0);
|
||||
let n_distinct: f32 = r.try_get(2).unwrap_or(0.0);
|
||||
let mcv_text: Option<String> = r.try_get(3).ok();
|
||||
let mcf_arr: Option<Vec<f32>> = r.try_get(4).ok();
|
||||
let hist_text: Option<String> = r.try_get(5).ok();
|
||||
|
||||
out.push_str(&format!("\n {}:\n", attname));
|
||||
out.push_str(&format!(" null_frac: {:.4}\n", null_frac));
|
||||
if n_distinct < 0.0 {
|
||||
out.push_str(&format!(
|
||||
" n_distinct: {:.3} (ratio of total rows)\n",
|
||||
-n_distinct
|
||||
));
|
||||
} else {
|
||||
out.push_str(&format!(" n_distinct: {}\n", n_distinct as i64));
|
||||
}
|
||||
|
||||
if let Some(text) = hist_text.as_deref() {
|
||||
let bounds = parse_pg_array_text_local(text);
|
||||
if let (Some(min), Some(max)) = (bounds.first(), bounds.last()) {
|
||||
out.push_str(&format!(" range: {} … {}\n", min, max));
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(text) = mcv_text.as_deref() {
|
||||
let vals = parse_pg_array_text_local(text);
|
||||
if !vals.is_empty() {
|
||||
let freqs = mcf_arr.unwrap_or_default();
|
||||
let pairs: Vec<String> = vals
|
||||
.iter()
|
||||
.take(PROFILE_TABLE_TOPK)
|
||||
.enumerate()
|
||||
.map(|(i, v)| match freqs.get(i) {
|
||||
Some(f) => format!("{}({:.3})", v, f),
|
||||
None => v.clone(),
|
||||
})
|
||||
.collect();
|
||||
out.push_str(&format!(" top: {}\n", pairs.join(", ")));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if total > take {
|
||||
out.push_str(&format!("\n…and {} more columns\n", total - take));
|
||||
}
|
||||
Ok(out)
|
||||
}
|
||||
|
||||
/// Local pg-array parser used by profile_table; mirrors `parse_pg_array_text` in ai.rs
|
||||
/// but kept local to avoid importing a private helper.
|
||||
fn parse_pg_array_text_local(s: &str) -> Vec<String> {
|
||||
let s = s.trim();
|
||||
let s = s.strip_prefix('{').unwrap_or(s);
|
||||
let s = s.strip_suffix('}').unwrap_or(s);
|
||||
if s.is_empty() {
|
||||
return Vec::new();
|
||||
}
|
||||
let mut out = Vec::new();
|
||||
let mut cur = String::new();
|
||||
let mut in_quotes = false;
|
||||
let mut chars = s.chars().peekable();
|
||||
while let Some(c) = chars.next() {
|
||||
match c {
|
||||
'"' if !in_quotes => in_quotes = true,
|
||||
'"' if in_quotes => {
|
||||
if chars.peek() == Some(&'"') {
|
||||
cur.push('"');
|
||||
chars.next();
|
||||
} else {
|
||||
in_quotes = false;
|
||||
}
|
||||
}
|
||||
',' if !in_quotes => {
|
||||
out.push(std::mem::take(&mut cur));
|
||||
}
|
||||
'\\' if in_quotes => {
|
||||
if let Some(next) = chars.next() {
|
||||
cur.push(next);
|
||||
}
|
||||
}
|
||||
other => cur.push(other),
|
||||
}
|
||||
}
|
||||
if !cur.is_empty() || s.ends_with(',') {
|
||||
out.push(cur);
|
||||
}
|
||||
out
|
||||
}
|
||||
|
||||
async fn profile_table_clickhouse(
|
||||
state: &AppState,
|
||||
connection_id: &str,
|
||||
schema: &str,
|
||||
table: &str,
|
||||
) -> TuskResult<String> {
|
||||
let client = state.get_ch_client(connection_id).await?;
|
||||
let active_db = client.database.clone();
|
||||
let dbn = if schema == "public" || schema.is_empty() {
|
||||
active_db
|
||||
} else {
|
||||
schema.to_string()
|
||||
};
|
||||
|
||||
let cols_sql = format!(
|
||||
"SELECT name, type FROM system.columns \
|
||||
WHERE database = '{}' AND table = '{}' \
|
||||
ORDER BY position LIMIT {}",
|
||||
dbn.replace('\'', "\\'"),
|
||||
table.replace('\'', "\\'"),
|
||||
PROFILE_TABLE_MAX_COLUMNS
|
||||
);
|
||||
let col_rows = client.fetch_objects(&cols_sql).await?;
|
||||
if col_rows.is_empty() {
|
||||
return Err(TuskError::Custom(format!(
|
||||
"Table '{}.{}' does not exist (or no privileges).",
|
||||
dbn, table
|
||||
)));
|
||||
}
|
||||
|
||||
let mut select_parts: Vec<String> = vec!["count() AS rows_total".to_string()];
|
||||
let mut col_names: Vec<String> = Vec::new();
|
||||
let mut col_types: Vec<String> = Vec::new();
|
||||
for r in &col_rows {
|
||||
let name = r.get("name").and_then(|v| v.as_str()).unwrap_or("").to_string();
|
||||
let dtype = r.get("type").and_then(|v| v.as_str()).unwrap_or("").to_string();
|
||||
if name.is_empty() {
|
||||
continue;
|
||||
}
|
||||
col_names.push(name.clone());
|
||||
col_types.push(dtype);
|
||||
let q = name.replace('`', "``");
|
||||
select_parts.push(format!("countIf(`{}` IS NULL) AS null_{}", q, col_names.len()));
|
||||
select_parts.push(format!("uniqHLL12(`{}`) AS dist_{}", q, col_names.len()));
|
||||
select_parts.push(format!("toString(min(`{}`)) AS min_{}", q, col_names.len()));
|
||||
select_parts.push(format!("toString(max(`{}`)) AS max_{}", q, col_names.len()));
|
||||
select_parts.push(format!(
|
||||
"arrayStringConcat(arrayMap(x -> toString(x), topK({})(`{}`)), '|') AS top_{}",
|
||||
PROFILE_TABLE_TOPK,
|
||||
q,
|
||||
col_names.len()
|
||||
));
|
||||
}
|
||||
|
||||
let agg_sql = format!(
|
||||
"SELECT {} FROM `{}`.`{}`",
|
||||
select_parts.join(", "),
|
||||
dbn.replace('`', "``"),
|
||||
table.replace('`', "``")
|
||||
);
|
||||
let agg_rows = client.fetch_objects(&agg_sql).await?;
|
||||
let row = agg_rows
|
||||
.first()
|
||||
.ok_or_else(|| TuskError::Custom("ClickHouse returned no row for profile aggregate".into()))?;
|
||||
|
||||
let rows_total = row
|
||||
.get("rows_total")
|
||||
.and_then(|v| v.as_str().and_then(|s| s.parse::<i64>().ok()).or_else(|| v.as_i64()))
|
||||
.unwrap_or(0);
|
||||
|
||||
let mut out = format!(
|
||||
"PROFILE {}.{}\nRows: {}\n{} columns profiled\n",
|
||||
dbn,
|
||||
table,
|
||||
rows_total,
|
||||
col_names.len()
|
||||
);
|
||||
|
||||
for (i, name) in col_names.iter().enumerate() {
|
||||
let n = i + 1;
|
||||
let nulls = row
|
||||
.get(&format!("null_{}", n))
|
||||
.and_then(|v| v.as_str().and_then(|s| s.parse::<i64>().ok()).or_else(|| v.as_i64()))
|
||||
.unwrap_or(0);
|
||||
let dist = row
|
||||
.get(&format!("dist_{}", n))
|
||||
.and_then(|v| v.as_str().and_then(|s| s.parse::<i64>().ok()).or_else(|| v.as_i64()))
|
||||
.unwrap_or(0);
|
||||
let min = row.get(&format!("min_{}", n)).and_then(|v| v.as_str()).unwrap_or("");
|
||||
let max = row.get(&format!("max_{}", n)).and_then(|v| v.as_str()).unwrap_or("");
|
||||
let top_raw = row.get(&format!("top_{}", n)).and_then(|v| v.as_str()).unwrap_or("");
|
||||
|
||||
out.push_str(&format!("\n {} ({}):\n", name, col_types[i]));
|
||||
let null_frac = if rows_total > 0 {
|
||||
nulls as f64 / rows_total as f64
|
||||
} else {
|
||||
0.0
|
||||
};
|
||||
out.push_str(&format!(" null_frac: {:.4}\n", null_frac));
|
||||
out.push_str(&format!(" distinct (HLL): {}\n", dist));
|
||||
if !min.is_empty() || !max.is_empty() {
|
||||
out.push_str(&format!(" range: {} … {}\n", min, max));
|
||||
}
|
||||
if !top_raw.is_empty() {
|
||||
let top_vals: Vec<&str> = top_raw.split('|').take(PROFILE_TABLE_TOPK).collect();
|
||||
out.push_str(&format!(" top: {}\n", top_vals.join(", ")));
|
||||
}
|
||||
}
|
||||
|
||||
if col_rows.len() == PROFILE_TABLE_MAX_COLUMNS {
|
||||
out.push_str(&format!(
|
||||
"\n…showing first {} columns\n",
|
||||
PROFILE_TABLE_MAX_COLUMNS
|
||||
));
|
||||
}
|
||||
Ok(out)
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// sample_data (PR2 — returns SQL string; dispatch site runs it through
|
||||
// execute_query_core so the QueryResult feeds the standard renderer)
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
pub async fn build_sample_sql(
|
||||
state: &AppState,
|
||||
connection_id: &str,
|
||||
table: &str,
|
||||
limit: u32,
|
||||
) -> TuskResult<String> {
|
||||
let active_db = active_db_name(state, connection_id).await.unwrap_or_default();
|
||||
let (schema, tbl, _raw) = normalise_table_ref(table, &active_db);
|
||||
let flavor = state.get_flavor(connection_id).await;
|
||||
match flavor {
|
||||
DbFlavor::PostgreSQL | DbFlavor::Greenplum => {
|
||||
build_sample_sql_postgres(state, connection_id, &schema, &tbl, limit).await
|
||||
}
|
||||
DbFlavor::ClickHouse => {
|
||||
build_sample_sql_clickhouse(state, connection_id, &schema, &tbl, limit).await
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn build_sample_sql_postgres(
|
||||
state: &AppState,
|
||||
connection_id: &str,
|
||||
schema: &str,
|
||||
table: &str,
|
||||
limit: u32,
|
||||
) -> TuskResult<String> {
|
||||
let pool = state.get_pool(connection_id).await?;
|
||||
let reltuples: f64 = sqlx::query_scalar(
|
||||
"SELECT c.reltuples FROM pg_class c JOIN pg_namespace n ON c.relnamespace = n.oid \
|
||||
WHERE n.nspname = $1 AND c.relname = $2",
|
||||
)
|
||||
.bind(schema)
|
||||
.bind(table)
|
||||
.fetch_optional(&pool)
|
||||
.await
|
||||
.map_err(TuskError::Database)?
|
||||
.unwrap_or(0.0);
|
||||
|
||||
let qualified = format!("{}.{}", escape_ident(schema), escape_ident(table));
|
||||
if reltuples > 0.0 {
|
||||
let target = limit as f64 * 100.0 / reltuples;
|
||||
let percent = target.clamp(0.01, 100.0);
|
||||
Ok(format!(
|
||||
"SELECT * FROM {} TABLESAMPLE BERNOULLI({:.4}) LIMIT {}",
|
||||
qualified, percent, limit
|
||||
))
|
||||
} else {
|
||||
Ok(format!(
|
||||
"SELECT * FROM {} ORDER BY random() LIMIT {}",
|
||||
qualified, limit
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
async fn build_sample_sql_clickhouse(
|
||||
state: &AppState,
|
||||
connection_id: &str,
|
||||
schema: &str,
|
||||
table: &str,
|
||||
limit: u32,
|
||||
) -> TuskResult<String> {
|
||||
let client = state.get_ch_client(connection_id).await?;
|
||||
let active_db = client.database.clone();
|
||||
let dbn = if schema == "public" || schema.is_empty() {
|
||||
active_db
|
||||
} else {
|
||||
schema.to_string()
|
||||
};
|
||||
|
||||
let info_sql = format!(
|
||||
"SELECT engine, sampling_key FROM system.tables \
|
||||
WHERE database = '{}' AND name = '{}' LIMIT 1",
|
||||
dbn.replace('\'', "\\'"),
|
||||
table.replace('\'', "\\'")
|
||||
);
|
||||
let rows = client.fetch_objects(&info_sql).await.unwrap_or_default();
|
||||
let (engine, sampling_key) = match rows.first() {
|
||||
Some(r) => (
|
||||
r.get("engine").and_then(|v| v.as_str()).unwrap_or("").to_string(),
|
||||
r.get("sampling_key").and_then(|v| v.as_str()).unwrap_or("").to_string(),
|
||||
),
|
||||
None => (String::new(), String::new()),
|
||||
};
|
||||
|
||||
let qualified = format!(
|
||||
"`{}`.`{}`",
|
||||
dbn.replace('`', "``"),
|
||||
table.replace('`', "``")
|
||||
);
|
||||
if engine.starts_with("Merge") && !sampling_key.trim().is_empty() {
|
||||
Ok(format!(
|
||||
"SELECT * FROM {} SAMPLE 0.01 LIMIT {}",
|
||||
qualified, limit
|
||||
))
|
||||
} else {
|
||||
Ok(format!(
|
||||
"SELECT * FROM {} ORDER BY rand() LIMIT {}",
|
||||
qualified, limit
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// explain_query (PR2)
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
pub async fn explain_query_tool(
|
||||
state: &AppState,
|
||||
connection_id: &str,
|
||||
sql: &str,
|
||||
) -> TuskResult<String> {
|
||||
let trimmed = sql.trim();
|
||||
if trimmed.is_empty() {
|
||||
return Err(TuskError::Custom("explain_query: sql must not be empty".into()));
|
||||
}
|
||||
// Validate the user's statement BEFORE prefixing EXPLAIN so the error message
|
||||
// references their SQL, not the wrapper. ensure_readonly_sql also rejects any
|
||||
// forbidden keywords (INSERT/UPDATE/DELETE/...) even nested under EXPLAIN.
|
||||
ensure_readonly_sql(trimmed).map_err(|e| TuskError::Custom(e.to_string()))?;
|
||||
|
||||
let flavor = state.get_flavor(connection_id).await;
|
||||
match flavor {
|
||||
DbFlavor::PostgreSQL | DbFlavor::Greenplum => {
|
||||
explain_query_postgres(state, connection_id, trimmed).await
|
||||
}
|
||||
DbFlavor::ClickHouse => explain_query_clickhouse(state, connection_id, trimmed).await,
|
||||
}
|
||||
}
|
||||
|
||||
async fn explain_query_postgres(
|
||||
state: &AppState,
|
||||
connection_id: &str,
|
||||
sql: &str,
|
||||
) -> TuskResult<String> {
|
||||
let pool = state.get_pool(connection_id).await?;
|
||||
let plan_sql = format!("EXPLAIN (FORMAT JSON, ANALYZE, BUFFERS) {}", sql);
|
||||
let mut tx = pool.begin().await.map_err(TuskError::Database)?;
|
||||
sqlx::query("SET TRANSACTION READ ONLY")
|
||||
.execute(&mut *tx)
|
||||
.await
|
||||
.map_err(TuskError::Database)?;
|
||||
let row = sqlx::query(&plan_sql)
|
||||
.fetch_one(&mut *tx)
|
||||
.await
|
||||
.map_err(TuskError::Database)?;
|
||||
let _ = tx.rollback().await;
|
||||
|
||||
let raw_json: serde_json::Value = match row.try_get::<serde_json::Value, _>(0) {
|
||||
Ok(v) => v,
|
||||
Err(_) => {
|
||||
let s: String = row.try_get(0).map_err(TuskError::Database)?;
|
||||
serde_json::from_str(&s)
|
||||
.map_err(|e| TuskError::Custom(format!("EXPLAIN JSON parse failed: {}", e)))?
|
||||
}
|
||||
};
|
||||
|
||||
let plans = raw_json
|
||||
.as_array()
|
||||
.ok_or_else(|| TuskError::Custom("EXPLAIN JSON: expected array".into()))?;
|
||||
let plan = plans.first().and_then(|p| p.get("Plan")).ok_or_else(|| {
|
||||
TuskError::Custom("EXPLAIN JSON: missing top-level Plan node".into())
|
||||
})?;
|
||||
|
||||
let root_node = plan.get("Node Type").and_then(|v| v.as_str()).unwrap_or("?");
|
||||
let total_cost = plan.get("Total Cost").and_then(|v| v.as_f64()).unwrap_or(0.0);
|
||||
let planning = plans
|
||||
.first()
|
||||
.and_then(|p| p.get("Planning Time").and_then(|v| v.as_f64()))
|
||||
.unwrap_or(0.0);
|
||||
let execution = plans
|
||||
.first()
|
||||
.and_then(|p| p.get("Execution Time").and_then(|v| v.as_f64()))
|
||||
.unwrap_or(0.0);
|
||||
|
||||
let mut seq_scans: Vec<String> = Vec::new();
|
||||
let mut spilled: Vec<String> = Vec::new();
|
||||
let mut motions: Vec<String> = Vec::new();
|
||||
let mut max_skew: Option<(f64, String)> = None;
|
||||
walk_pg_plan(plan, &mut seq_scans, &mut spilled, &mut motions, &mut max_skew);
|
||||
|
||||
let mut out = format!(
|
||||
"PLAN root: {}, total cost {:.1}\nPlanning: {:.2} ms Execution: {:.2} ms\n",
|
||||
root_node, total_cost, planning, execution
|
||||
);
|
||||
if !seq_scans.is_empty() {
|
||||
out.push_str(&format!("Seq scans on: {}\n", seq_scans.join(", ")));
|
||||
}
|
||||
if !spilled.is_empty() {
|
||||
out.push_str(&format!("Spilled to disk: {}\n", spilled.join(", ")));
|
||||
}
|
||||
if !motions.is_empty() {
|
||||
out.push_str(&format!("Motions (Greenplum): {}\n", motions.join(", ")));
|
||||
}
|
||||
if let Some((ratio, node)) = max_skew {
|
||||
if ratio >= 5.0 {
|
||||
out.push_str(&format!(
|
||||
"Estimate skew: max plan/actual ratio = {:.1} on {}\n",
|
||||
ratio, node
|
||||
));
|
||||
}
|
||||
}
|
||||
if seq_scans.is_empty() && spilled.is_empty() && motions.is_empty() {
|
||||
out.push_str("No obvious red flags.\n");
|
||||
}
|
||||
Ok(out)
|
||||
}
|
||||
|
||||
fn walk_pg_plan(
|
||||
node: &serde_json::Value,
|
||||
seq_scans: &mut Vec<String>,
|
||||
spilled: &mut Vec<String>,
|
||||
motions: &mut Vec<String>,
|
||||
max_skew: &mut Option<(f64, String)>,
|
||||
) {
|
||||
let node_type = node.get("Node Type").and_then(|v| v.as_str()).unwrap_or("");
|
||||
if node_type == "Seq Scan" {
|
||||
let rel = node
|
||||
.get("Relation Name")
|
||||
.and_then(|v| v.as_str())
|
||||
.unwrap_or("?");
|
||||
let schema = node
|
||||
.get("Schema")
|
||||
.and_then(|v| v.as_str())
|
||||
.map(|s| format!("{}.", s))
|
||||
.unwrap_or_default();
|
||||
seq_scans.push(format!("{}{}", schema, rel));
|
||||
}
|
||||
if let Some(method) = node.get("Sort Method").and_then(|v| v.as_str()) {
|
||||
if method.contains("disk") || method.contains("external") {
|
||||
spilled.push(format!("Sort ({})", method));
|
||||
}
|
||||
}
|
||||
if node_type.contains("Motion") {
|
||||
motions.push(node_type.to_string());
|
||||
}
|
||||
let plan_rows = node.get("Plan Rows").and_then(|v| v.as_f64()).unwrap_or(0.0);
|
||||
let actual_rows = node.get("Actual Rows").and_then(|v| v.as_f64()).unwrap_or(0.0);
|
||||
if actual_rows > 0.0 && plan_rows > 0.0 {
|
||||
let ratio = (plan_rows / actual_rows).max(actual_rows / plan_rows);
|
||||
if max_skew.as_ref().map(|(r, _)| ratio > *r).unwrap_or(true) {
|
||||
*max_skew = Some((ratio, node_type.to_string()));
|
||||
}
|
||||
}
|
||||
if let Some(children) = node.get("Plans").and_then(|v| v.as_array()) {
|
||||
for child in children {
|
||||
walk_pg_plan(child, seq_scans, spilled, motions, max_skew);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn explain_query_clickhouse(
|
||||
state: &AppState,
|
||||
connection_id: &str,
|
||||
sql: &str,
|
||||
) -> TuskResult<String> {
|
||||
let client = state.get_ch_client(connection_id).await?;
|
||||
let plan_sql = format!("EXPLAIN PLAN {}", sql);
|
||||
let qr = client.execute_query(&plan_sql, true).await?;
|
||||
if qr.rows.is_empty() {
|
||||
return Ok("(empty plan)".to_string());
|
||||
}
|
||||
let mut out = String::from("ClickHouse plan:\n");
|
||||
for row in &qr.rows {
|
||||
if let Some(cell) = row.first() {
|
||||
if let Some(s) = cell.as_str() {
|
||||
out.push_str(s);
|
||||
out.push('\n');
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(out)
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// detect_skew (PR2 — Greenplum-only)
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
pub async fn detect_skew_tool(
|
||||
state: &AppState,
|
||||
connection_id: &str,
|
||||
table: &str,
|
||||
) -> TuskResult<String> {
|
||||
let flavor = state.get_flavor(connection_id).await;
|
||||
if !matches!(flavor, DbFlavor::Greenplum) {
|
||||
return Ok("detect_skew is only available on Greenplum connections.".to_string());
|
||||
}
|
||||
let active_db = active_db_name(state, connection_id).await.unwrap_or_default();
|
||||
let (schema, tbl, _raw) = normalise_table_ref(table, &active_db);
|
||||
|
||||
let qualified = format!("{}.{}", escape_ident(&schema), escape_ident(&tbl));
|
||||
let sql = format!(
|
||||
"SELECT gp_segment_id, COUNT(*) AS n FROM {} GROUP BY 1 ORDER BY 1",
|
||||
qualified
|
||||
);
|
||||
let qr = execute_query_core(state, connection_id, &sql).await?;
|
||||
|
||||
let mut counts: Vec<(i64, i64)> = Vec::new();
|
||||
for row in &qr.rows {
|
||||
let seg = row
|
||||
.get(0)
|
||||
.and_then(|v| v.as_i64().or_else(|| v.as_str().and_then(|s| s.parse().ok())))
|
||||
.unwrap_or(0);
|
||||
let n = row
|
||||
.get(1)
|
||||
.and_then(|v| v.as_i64().or_else(|| v.as_str().and_then(|s| s.parse().ok())))
|
||||
.unwrap_or(0);
|
||||
counts.push((seg, n));
|
||||
}
|
||||
|
||||
if counts.is_empty() {
|
||||
return Ok(format!("Table {}.{} is empty.", schema, tbl));
|
||||
}
|
||||
|
||||
let total: i64 = counts.iter().map(|(_, n)| *n).sum();
|
||||
let max = counts.iter().map(|(_, n)| *n).max().unwrap_or(0);
|
||||
let min = counts.iter().map(|(_, n)| *n).min().unwrap_or(0);
|
||||
let avg = total as f64 / counts.len() as f64;
|
||||
let ratio = if avg > 0.0 { max as f64 / avg } else { 0.0 };
|
||||
|
||||
let mut out = format!(
|
||||
"Per-segment row distribution for {}.{}\nsegments: {} total rows: {}\nmin: {} max: {} avg: {:.0}\nskew ratio (max/avg): {:.2}",
|
||||
schema,
|
||||
tbl,
|
||||
counts.len(),
|
||||
total,
|
||||
min,
|
||||
max,
|
||||
avg,
|
||||
ratio
|
||||
);
|
||||
if ratio > 1.5 {
|
||||
out.push_str(" ⚠ uneven distribution\n");
|
||||
} else {
|
||||
out.push_str(" OK — within 1.5x of average\n");
|
||||
}
|
||||
|
||||
let pool = state.get_pool(connection_id).await?;
|
||||
if let Some(policy) = fetch_gp_distribution_for(&pool, &schema, &tbl).await {
|
||||
out.push_str(&format!("\nCurrent policy: {}\n", policy));
|
||||
if ratio > 1.5 {
|
||||
out.push_str(
|
||||
"Hint: pick a higher-cardinality column. Run profile_table to compare n_distinct.\n",
|
||||
);
|
||||
}
|
||||
}
|
||||
Ok(out)
|
||||
}
|
||||
|
||||
/// Fetch the Greenplum DISTRIBUTED BY policy for a single table. Returns None if
|
||||
/// the catalog query fails (non-GP connection, missing privileges, etc.).
|
||||
async fn fetch_gp_distribution_for(
|
||||
pool: &PgPool,
|
||||
schema: &str,
|
||||
table: &str,
|
||||
) -> Option<String> {
|
||||
let row = sqlx::query(
|
||||
"SELECT COALESCE(\
|
||||
(SELECT array_agg(a.attname ORDER BY ord.idx) \
|
||||
FROM regexp_split_to_table(NULLIF(trim(p.distkey::text), ''), ' ') \
|
||||
WITH ORDINALITY AS ord(attnum_str, idx) \
|
||||
JOIN pg_attribute a \
|
||||
ON a.attrelid = c.oid \
|
||||
AND a.attnum::int = ord.attnum_str::int), \
|
||||
ARRAY[]::text[] \
|
||||
) AS dist_columns \
|
||||
FROM gp_distribution_policy p \
|
||||
JOIN pg_class c ON p.localoid = c.oid \
|
||||
JOIN pg_namespace n ON c.relnamespace = n.oid \
|
||||
WHERE n.nspname = $1 AND c.relname = $2",
|
||||
)
|
||||
.bind(schema)
|
||||
.bind(table)
|
||||
.fetch_optional(pool)
|
||||
.await
|
||||
.ok()
|
||||
.flatten()?;
|
||||
let cols: Vec<String> = row.try_get(0).ok()?;
|
||||
Some(if cols.is_empty() {
|
||||
"DISTRIBUTED RANDOMLY".to_string()
|
||||
} else {
|
||||
format!("DISTRIBUTED BY ({})", cols.join(", "))
|
||||
})
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user