diff --git a/src-tauri/src/commands/ai.rs b/src-tauri/src/commands/ai.rs index 0d78fe6..49619bb 100644 --- a/src-tauri/src/commands/ai.rs +++ b/src-tauri/src/commands/ai.rs @@ -613,6 +613,24 @@ async fn build_overview_postgres(state: &AppState, connection_id: &str) -> TuskR .to_string(), ); + if matches!(state.get_flavor(connection_id).await, DbFlavor::Greenplum) { + out.push(String::new()); + out.push( + "GREENPLUM NOTES (this is an MPP cluster, not vanilla PostgreSQL):\n\ + - When you call get_columns on a table you'll see a `-- GP: DISTRIBUTED BY (...) | STORAGE: ...` line. \ + Use it for join planning.\n\ + - JOINs on the distribution key avoid Redistribute Motion (often 10-100x faster); \ + JOINs on non-distribution columns broadcast or redistribute the smaller side.\n\ + - AO/AOCO storage is optimized for large analytical scans; heap is for point lookups. \ + Don't run wide row-by-row predicates against AOCO without a covering filter.\n\ + - For data-skew diagnostics on a table T:\n\ + SELECT gp_segment_id, count(*) FROM T GROUP BY 1 ORDER BY 2 DESC LIMIT 10;\n\ + - Greenplum extends PostgreSQL syntax — most things work, but some features differ \ + by major version (GP6 ≈ PG9.4 catalog, GP7 ≈ PG14)." + .to_string(), + ); + } + Ok(out.join("\n")) } @@ -690,10 +708,14 @@ pub(crate) async fn build_schema_context( return Ok(cached); } - if matches!(state.get_flavor(connection_id).await, DbFlavor::ClickHouse) { + let flavor = state.get_flavor(connection_id).await; + if matches!(flavor, DbFlavor::ClickHouse) { return build_clickhouse_schema_context(state, connection_id).await; } + let is_greenplum = matches!(flavor, DbFlavor::Greenplum); + let gp_major = state.get_gp_major(connection_id).await.unwrap_or(7); + let pool = state.get_pool(connection_id).await?; // Run all metadata queries in parallel for speed @@ -739,6 +761,11 @@ pub(crate) async fn build_schema_context( let unique_constraints = unique_res?; let varchar_values = varchar_res.unwrap_or_default(); let jsonb_keys = jsonb_res.unwrap_or_default(); + let gp_extras = if is_greenplum { + Some(fetch_gp_table_extras(&pool, gp_major).await) + } else { + None + }; // -- Build FK inline lookup: (schema, table, column) -> "ref_schema.ref_table(ref_col)" -- let mut fk_inline: HashMap<(String, String, String), String> = HashMap::new(); @@ -869,6 +896,7 @@ pub(crate) async fn build_schema_context( &unique_map, &varchar_values, &jsonb_keys, + gp_extras.as_ref(), &mut output, ); } @@ -921,6 +949,7 @@ pub(crate) fn format_table_block( unique_map: &HashMap<(String, String), Vec>, varchar_values: &HashMap<(String, String, String), Vec>, jsonb_keys: &HashMap<(String, String, String), Vec>, + gp_extras: Option<&HashMap<(String, String), GpTableExtras>>, output: &mut Vec, ) { let tbl_comment = tbl_comments.get(full_name).map(|c| c.as_str()); @@ -929,6 +958,29 @@ pub(crate) fn format_table_block( None => output.push(format!("\nTABLE {}", full_name)), } + // Greenplum-specific table attributes (distribution + storage), printed + // immediately under the TABLE header so the agent sees them before any + // column. Skipped on non-GP connections. + if let Some(extras_map) = gp_extras { + let parts: Vec<&str> = full_name.splitn(2, '.').collect(); + if parts.len() == 2 { + if let Some(extras) = + extras_map.get(&(parts[0].to_string(), parts[1].to_string())) + { + let mut bits: Vec = Vec::new(); + if !extras.distribution.is_empty() { + bits.push(extras.distribution.clone()); + } + if let Some(storage) = &extras.storage { + bits.push(format!("STORAGE: {}", storage)); + } + if !bits.is_empty() { + output.push(format!(" -- GP: {}", bits.join(" | "))); + } + } + } + } + for ci in columns { let mut parts: Vec = vec![ci.column.clone(), ci.data_type.clone()]; @@ -1446,6 +1498,119 @@ async fn fetch_jsonb_keys( Some(map) } +// --------------------------------------------------------------------------- +// Greenplum-specific table attributes +// --------------------------------------------------------------------------- + +/// Per-table Greenplum metadata that aggressively shapes query performance: +/// the data-distribution policy and the storage type. Without these visible +/// to the chat agent it tends to write PG-optimal SQL that triggers +/// Redistribute Motion in GP and runs orders of magnitude slower than needed. +#[derive(Debug, Clone, Default)] +pub(crate) struct GpTableExtras { + /// Pre-formatted policy string: e.g. "DISTRIBUTED BY (id, created_at)" or + /// "DISTRIBUTED RANDOMLY". Empty string means "policy unknown". + pub distribution: String, + /// Storage kind: "heap" / "AO row" / "AO column" / "external" / etc. + /// `None` when storage couldn't be determined for this catalog version. + pub storage: Option, +} + +/// Fetch DISTRIBUTED BY policy + storage kind for every user table in the +/// current GP database. Two parallel queries; storage uses the GP-major-aware +/// catalog (GP6 has `pg_class.relstorage`, GP7 dropped it and uses access +/// methods via `pg_am`). Returns an empty map on any catalog miss so callers +/// degrade gracefully on unfamiliar GP releases. +pub(crate) async fn fetch_gp_table_extras( + pool: &sqlx::PgPool, + gp_major: u8, +) -> HashMap<(String, String), GpTableExtras> { + let dist_query = " + SELECT n.nspname, c.relname, + COALESCE( + (SELECT array_agg(a.attname ORDER BY ord.idx) + FROM regexp_split_to_table(NULLIF(trim(p.distkey::text), ''), ' ') + WITH ORDINALITY AS ord(attnum_str, idx) + JOIN pg_attribute a + ON a.attrelid = c.oid + AND a.attnum::int = ord.attnum_str::int), + ARRAY[]::text[] + ) AS dist_columns + FROM gp_distribution_policy p + JOIN pg_class c ON p.localoid = c.oid + JOIN pg_namespace n ON c.relnamespace = n.oid + WHERE n.nspname NOT IN ('pg_catalog','information_schema','pg_toast','gp_toolkit') + "; + + let storage_query = if gp_major <= 6 { + // GP6 (PG9.4 base) — reltype is encoded in pg_class.relstorage. + "SELECT n.nspname, c.relname, + CASE c.relstorage + WHEN 'h' THEN 'heap' + WHEN 'a' THEN 'AO row' + WHEN 'c' THEN 'AO column' + WHEN 'x' THEN 'external' + ELSE c.relstorage::text + END AS storage + FROM pg_class c + JOIN pg_namespace n ON c.relnamespace = n.oid + WHERE c.relkind IN ('r','m') + AND n.nspname NOT IN ('pg_catalog','information_schema','pg_toast','gp_toolkit')" + } else { + // GP7 (PG14 base) — relstorage is gone; access method on pg_class.relam + // identifies AO row / AO column tables (extension AM names are + // 'ao_row' and 'ao_column'). + "SELECT n.nspname, c.relname, + CASE + WHEN am.amname = 'heap' THEN 'heap' + WHEN am.amname = 'ao_row' THEN 'AO row' + WHEN am.amname = 'ao_column' THEN 'AO column' + ELSE COALESCE(am.amname, 'unknown') + END AS storage + FROM pg_class c + JOIN pg_namespace n ON c.relnamespace = n.oid + LEFT JOIN pg_am am ON c.relam = am.oid + WHERE c.relkind IN ('r','m') + AND n.nspname NOT IN ('pg_catalog','information_schema','pg_toast','gp_toolkit')" + }; + + let (dist_res, storage_res) = tokio::join!( + sqlx::query(dist_query).fetch_all(pool), + sqlx::query(storage_query).fetch_all(pool), + ); + + let mut map: HashMap<(String, String), GpTableExtras> = HashMap::new(); + + if let Ok(rows) = dist_res { + for r in &rows { + let schema: String = r.get(0); + let table: String = r.get(1); + let cols: Vec = r.get(2); + let distribution = if cols.is_empty() { + "DISTRIBUTED RANDOMLY".to_string() + } else { + format!("DISTRIBUTED BY ({})", cols.join(", ")) + }; + map.entry((schema, table)).or_default().distribution = distribution; + } + } else if let Err(e) = dist_res { + log::warn!("gp_distribution_policy fetch failed: {}", e); + } + + if let Ok(rows) = storage_res { + for r in &rows { + let schema: String = r.get(0); + let table: String = r.get(1); + let storage: String = r.get(2); + map.entry((schema, table)).or_default().storage = Some(storage); + } + } else if let Err(e) = storage_res { + log::warn!("Greenplum storage fetch failed: {}", e); + } + + map +} + // --------------------------------------------------------------------------- // Helpers // --------------------------------------------------------------------------- diff --git a/src-tauri/src/commands/chat_tools.rs b/src-tauri/src/commands/chat_tools.rs index cd18373..3608c74 100644 --- a/src-tauri/src/commands/chat_tools.rs +++ b/src-tauri/src/commands/chat_tools.rs @@ -6,7 +6,8 @@ use crate::commands::ai::{ fetch_column_comments, fetch_columns, fetch_enum_types, fetch_foreign_keys_raw, - fetch_table_comments, fetch_unique_constraints, format_table_block, ColumnInfo, + fetch_gp_table_extras, fetch_table_comments, fetch_unique_constraints, format_table_block, + ColumnInfo, }; use crate::commands::connections::{load_connection_config, switch_database_core}; use crate::commands::saved_queries::{list_saved_queries_core, save_query_core}; @@ -219,6 +220,8 @@ async fn get_columns_postgres( requested: &[(String, String, String)], ) -> TuskResult { let pool = state.get_pool(connection_id).await?; + let is_greenplum = matches!(state.get_flavor(connection_id).await, DbFlavor::Greenplum); + let gp_major = state.get_gp_major(connection_id).await.unwrap_or(7); let (col_res, fk_res, enum_res, tbl_comm_res, col_comm_res, unique_res) = tokio::join!( fetch_columns(&pool), @@ -234,6 +237,11 @@ async fn get_columns_postgres( let tbl_comments = tbl_comm_res.unwrap_or_default(); let col_comments = col_comm_res.unwrap_or_default(); let uniques = unique_res.unwrap_or_default(); + let gp_extras = if is_greenplum { + Some(fetch_gp_table_extras(&pool, gp_major).await) + } else { + None + }; // Build (schema, table) → Vec let mut by_table: BTreeMap<(String, String), Vec> = BTreeMap::new(); @@ -282,6 +290,7 @@ async fn get_columns_postgres( &unique_map, &varchar_values, &jsonb_keys, + gp_extras.as_ref(), &mut output, ); } diff --git a/src-tauri/src/commands/connections.rs b/src-tauri/src/commands/connections.rs index f88664d..31a01a9 100644 --- a/src-tauri/src/commands/connections.rs +++ b/src-tauri/src/commands/connections.rs @@ -99,6 +99,7 @@ async fn close_connection(state: &AppState, id: &str) { let mut flavors = state.db_flavors.write().await; flavors.remove(id); drop(flavors); + state.gp_majors.write().await.remove(id); state.invalidate_chat_caches_for(id).await; } @@ -196,6 +197,19 @@ pub async fn connect( } else { DbFlavor::PostgreSQL }; + // Pull the GP major (6 or 7) from a string like + // "PostgreSQL 14.4 (Greenplum Database 7.1.0 ...)". + // GP6 and GP7 expose very different system catalogs, so downstream + // schema-introspection code branches on this. + let gp_major: Option = if flavor == DbFlavor::Greenplum { + version + .split("Greenplum Database ") + .nth(1) + .and_then(|tail| tail.split('.').next()) + .and_then(|major| major.parse::().ok()) + } else { + None + }; state.pools.write().await.insert(config.id.clone(), pool); state.read_only.write().await.insert(config.id.clone(), true); state @@ -203,6 +217,11 @@ pub async fn connect( .write() .await .insert(config.id.clone(), flavor); + if let Some(m) = gp_major { + state.gp_majors.write().await.insert(config.id.clone(), m); + } else { + state.gp_majors.write().await.remove(&config.id); + } Ok(ConnectResult { version, flavor }) } } diff --git a/src-tauri/src/state.rs b/src-tauri/src/state.rs index a272bc0..3336c65 100644 --- a/src-tauri/src/state.rs +++ b/src-tauri/src/state.rs @@ -40,6 +40,9 @@ pub struct AppState { pub ch_clients: RwLock>>, pub read_only: RwLock>, pub db_flavors: RwLock>, + /// Greenplum major version (6 or 7), tracked separately because GP6 and GP7 + /// expose very different system catalogs (GP6 = PG9.4 base, GP7 = PG14 base). + pub gp_majors: RwLock>, /// Legacy cache used by generate_sql/explain_sql/fix_sql_error — full DDL. pub schema_cache: RwLock>, /// Chat v2 caches: lite overview per connection. @@ -65,6 +68,7 @@ impl AppState { ch_clients: RwLock::new(HashMap::new()), read_only: RwLock::new(HashMap::new()), db_flavors: RwLock::new(HashMap::new()), + gp_majors: RwLock::new(HashMap::new()), schema_cache: RwLock::new(HashMap::new()), overview_cache: RwLock::new(HashMap::new()), tables_by_db_cache: RwLock::new(HashMap::new()), @@ -116,6 +120,12 @@ impl AppState { map.get(id).copied().unwrap_or(DbFlavor::PostgreSQL) } + /// Returns the Greenplum major version (6 or 7) for a connection, or None + /// for non-GP connections / when the version string couldn't be parsed. + pub async fn get_gp_major(&self, id: &str) -> Option { + self.gp_majors.read().await.get(id).copied() + } + pub async fn get_schema_cache(&self, connection_id: &str) -> Option { let cache = self.schema_cache.read().await; cache.get(connection_id).and_then(|entry| {