feat(greenplum): expose distribution key + storage type to chat agent
When connected to a Greenplum cluster, schema introspection now surfaces the two attributes that dominate query performance there: the data distribution policy (DISTRIBUTED BY / RANDOMLY) and the storage kind (heap / AO row / AO column). Without these, the agent writes PG-optimal SQL that triggers Redistribute Motion in GP and runs orders of magnitude slower than necessary. - track Greenplum major version (6 vs 7) at connect time, since GP6 uses pg_class.relstorage and GP7 dropped it in favor of pg_am - new fetch_gp_table_extras helper queries gp_distribution_policy and the version-appropriate storage catalog, returns a per-table map - format_table_block prints ` -- GP: DISTRIBUTED BY (...) | STORAGE: ...` under the TABLE header when extras are available - build_overview_postgres appends a GREENPLUM NOTES block with the distribution-aware-join rules and a skew-detection one-liner; the agent sees this in every system prompt on a GP connection - build_schema_context (legacy generate_sql / explain_sql / fix_sql_error path) and the chat get_columns tool both feed extras into format_table_block P0 of the GP support arc — partitioning, EXPLAIN motion-aware parsing, external tables, resource queues, and a skew-check tool are deliberately deferred to follow-up commits.
This commit is contained in:
@@ -613,6 +613,24 @@ async fn build_overview_postgres(state: &AppState, connection_id: &str) -> TuskR
|
||||
.to_string(),
|
||||
);
|
||||
|
||||
if matches!(state.get_flavor(connection_id).await, DbFlavor::Greenplum) {
|
||||
out.push(String::new());
|
||||
out.push(
|
||||
"GREENPLUM NOTES (this is an MPP cluster, not vanilla PostgreSQL):\n\
|
||||
- When you call get_columns on a table you'll see a `-- GP: DISTRIBUTED BY (...) | STORAGE: ...` line. \
|
||||
Use it for join planning.\n\
|
||||
- JOINs on the distribution key avoid Redistribute Motion (often 10-100x faster); \
|
||||
JOINs on non-distribution columns broadcast or redistribute the smaller side.\n\
|
||||
- AO/AOCO storage is optimized for large analytical scans; heap is for point lookups. \
|
||||
Don't run wide row-by-row predicates against AOCO without a covering filter.\n\
|
||||
- For data-skew diagnostics on a table T:\n\
|
||||
SELECT gp_segment_id, count(*) FROM T GROUP BY 1 ORDER BY 2 DESC LIMIT 10;\n\
|
||||
- Greenplum extends PostgreSQL syntax — most things work, but some features differ \
|
||||
by major version (GP6 ≈ PG9.4 catalog, GP7 ≈ PG14)."
|
||||
.to_string(),
|
||||
);
|
||||
}
|
||||
|
||||
Ok(out.join("\n"))
|
||||
}
|
||||
|
||||
@@ -690,10 +708,14 @@ pub(crate) async fn build_schema_context(
|
||||
return Ok(cached);
|
||||
}
|
||||
|
||||
if matches!(state.get_flavor(connection_id).await, DbFlavor::ClickHouse) {
|
||||
let flavor = state.get_flavor(connection_id).await;
|
||||
if matches!(flavor, DbFlavor::ClickHouse) {
|
||||
return build_clickhouse_schema_context(state, connection_id).await;
|
||||
}
|
||||
|
||||
let is_greenplum = matches!(flavor, DbFlavor::Greenplum);
|
||||
let gp_major = state.get_gp_major(connection_id).await.unwrap_or(7);
|
||||
|
||||
let pool = state.get_pool(connection_id).await?;
|
||||
|
||||
// Run all metadata queries in parallel for speed
|
||||
@@ -739,6 +761,11 @@ pub(crate) async fn build_schema_context(
|
||||
let unique_constraints = unique_res?;
|
||||
let varchar_values = varchar_res.unwrap_or_default();
|
||||
let jsonb_keys = jsonb_res.unwrap_or_default();
|
||||
let gp_extras = if is_greenplum {
|
||||
Some(fetch_gp_table_extras(&pool, gp_major).await)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
// -- Build FK inline lookup: (schema, table, column) -> "ref_schema.ref_table(ref_col)" --
|
||||
let mut fk_inline: HashMap<(String, String, String), String> = HashMap::new();
|
||||
@@ -869,6 +896,7 @@ pub(crate) async fn build_schema_context(
|
||||
&unique_map,
|
||||
&varchar_values,
|
||||
&jsonb_keys,
|
||||
gp_extras.as_ref(),
|
||||
&mut output,
|
||||
);
|
||||
}
|
||||
@@ -921,6 +949,7 @@ pub(crate) fn format_table_block(
|
||||
unique_map: &HashMap<(String, String), Vec<String>>,
|
||||
varchar_values: &HashMap<(String, String, String), Vec<String>>,
|
||||
jsonb_keys: &HashMap<(String, String, String), Vec<String>>,
|
||||
gp_extras: Option<&HashMap<(String, String), GpTableExtras>>,
|
||||
output: &mut Vec<String>,
|
||||
) {
|
||||
let tbl_comment = tbl_comments.get(full_name).map(|c| c.as_str());
|
||||
@@ -929,6 +958,29 @@ pub(crate) fn format_table_block(
|
||||
None => output.push(format!("\nTABLE {}", full_name)),
|
||||
}
|
||||
|
||||
// Greenplum-specific table attributes (distribution + storage), printed
|
||||
// immediately under the TABLE header so the agent sees them before any
|
||||
// column. Skipped on non-GP connections.
|
||||
if let Some(extras_map) = gp_extras {
|
||||
let parts: Vec<&str> = full_name.splitn(2, '.').collect();
|
||||
if parts.len() == 2 {
|
||||
if let Some(extras) =
|
||||
extras_map.get(&(parts[0].to_string(), parts[1].to_string()))
|
||||
{
|
||||
let mut bits: Vec<String> = Vec::new();
|
||||
if !extras.distribution.is_empty() {
|
||||
bits.push(extras.distribution.clone());
|
||||
}
|
||||
if let Some(storage) = &extras.storage {
|
||||
bits.push(format!("STORAGE: {}", storage));
|
||||
}
|
||||
if !bits.is_empty() {
|
||||
output.push(format!(" -- GP: {}", bits.join(" | ")));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for ci in columns {
|
||||
let mut parts: Vec<String> = vec![ci.column.clone(), ci.data_type.clone()];
|
||||
|
||||
@@ -1446,6 +1498,119 @@ async fn fetch_jsonb_keys(
|
||||
Some(map)
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Greenplum-specific table attributes
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
/// Per-table Greenplum metadata that aggressively shapes query performance:
|
||||
/// the data-distribution policy and the storage type. Without these visible
|
||||
/// to the chat agent it tends to write PG-optimal SQL that triggers
|
||||
/// Redistribute Motion in GP and runs orders of magnitude slower than needed.
|
||||
#[derive(Debug, Clone, Default)]
|
||||
pub(crate) struct GpTableExtras {
|
||||
/// Pre-formatted policy string: e.g. "DISTRIBUTED BY (id, created_at)" or
|
||||
/// "DISTRIBUTED RANDOMLY". Empty string means "policy unknown".
|
||||
pub distribution: String,
|
||||
/// Storage kind: "heap" / "AO row" / "AO column" / "external" / etc.
|
||||
/// `None` when storage couldn't be determined for this catalog version.
|
||||
pub storage: Option<String>,
|
||||
}
|
||||
|
||||
/// Fetch DISTRIBUTED BY policy + storage kind for every user table in the
|
||||
/// current GP database. Two parallel queries; storage uses the GP-major-aware
|
||||
/// catalog (GP6 has `pg_class.relstorage`, GP7 dropped it and uses access
|
||||
/// methods via `pg_am`). Returns an empty map on any catalog miss so callers
|
||||
/// degrade gracefully on unfamiliar GP releases.
|
||||
pub(crate) async fn fetch_gp_table_extras(
|
||||
pool: &sqlx::PgPool,
|
||||
gp_major: u8,
|
||||
) -> HashMap<(String, String), GpTableExtras> {
|
||||
let dist_query = "
|
||||
SELECT n.nspname, c.relname,
|
||||
COALESCE(
|
||||
(SELECT array_agg(a.attname ORDER BY ord.idx)
|
||||
FROM regexp_split_to_table(NULLIF(trim(p.distkey::text), ''), ' ')
|
||||
WITH ORDINALITY AS ord(attnum_str, idx)
|
||||
JOIN pg_attribute a
|
||||
ON a.attrelid = c.oid
|
||||
AND a.attnum::int = ord.attnum_str::int),
|
||||
ARRAY[]::text[]
|
||||
) AS dist_columns
|
||||
FROM gp_distribution_policy p
|
||||
JOIN pg_class c ON p.localoid = c.oid
|
||||
JOIN pg_namespace n ON c.relnamespace = n.oid
|
||||
WHERE n.nspname NOT IN ('pg_catalog','information_schema','pg_toast','gp_toolkit')
|
||||
";
|
||||
|
||||
let storage_query = if gp_major <= 6 {
|
||||
// GP6 (PG9.4 base) — reltype is encoded in pg_class.relstorage.
|
||||
"SELECT n.nspname, c.relname,
|
||||
CASE c.relstorage
|
||||
WHEN 'h' THEN 'heap'
|
||||
WHEN 'a' THEN 'AO row'
|
||||
WHEN 'c' THEN 'AO column'
|
||||
WHEN 'x' THEN 'external'
|
||||
ELSE c.relstorage::text
|
||||
END AS storage
|
||||
FROM pg_class c
|
||||
JOIN pg_namespace n ON c.relnamespace = n.oid
|
||||
WHERE c.relkind IN ('r','m')
|
||||
AND n.nspname NOT IN ('pg_catalog','information_schema','pg_toast','gp_toolkit')"
|
||||
} else {
|
||||
// GP7 (PG14 base) — relstorage is gone; access method on pg_class.relam
|
||||
// identifies AO row / AO column tables (extension AM names are
|
||||
// 'ao_row' and 'ao_column').
|
||||
"SELECT n.nspname, c.relname,
|
||||
CASE
|
||||
WHEN am.amname = 'heap' THEN 'heap'
|
||||
WHEN am.amname = 'ao_row' THEN 'AO row'
|
||||
WHEN am.amname = 'ao_column' THEN 'AO column'
|
||||
ELSE COALESCE(am.amname, 'unknown')
|
||||
END AS storage
|
||||
FROM pg_class c
|
||||
JOIN pg_namespace n ON c.relnamespace = n.oid
|
||||
LEFT JOIN pg_am am ON c.relam = am.oid
|
||||
WHERE c.relkind IN ('r','m')
|
||||
AND n.nspname NOT IN ('pg_catalog','information_schema','pg_toast','gp_toolkit')"
|
||||
};
|
||||
|
||||
let (dist_res, storage_res) = tokio::join!(
|
||||
sqlx::query(dist_query).fetch_all(pool),
|
||||
sqlx::query(storage_query).fetch_all(pool),
|
||||
);
|
||||
|
||||
let mut map: HashMap<(String, String), GpTableExtras> = HashMap::new();
|
||||
|
||||
if let Ok(rows) = dist_res {
|
||||
for r in &rows {
|
||||
let schema: String = r.get(0);
|
||||
let table: String = r.get(1);
|
||||
let cols: Vec<String> = r.get(2);
|
||||
let distribution = if cols.is_empty() {
|
||||
"DISTRIBUTED RANDOMLY".to_string()
|
||||
} else {
|
||||
format!("DISTRIBUTED BY ({})", cols.join(", "))
|
||||
};
|
||||
map.entry((schema, table)).or_default().distribution = distribution;
|
||||
}
|
||||
} else if let Err(e) = dist_res {
|
||||
log::warn!("gp_distribution_policy fetch failed: {}", e);
|
||||
}
|
||||
|
||||
if let Ok(rows) = storage_res {
|
||||
for r in &rows {
|
||||
let schema: String = r.get(0);
|
||||
let table: String = r.get(1);
|
||||
let storage: String = r.get(2);
|
||||
map.entry((schema, table)).or_default().storage = Some(storage);
|
||||
}
|
||||
} else if let Err(e) = storage_res {
|
||||
log::warn!("Greenplum storage fetch failed: {}", e);
|
||||
}
|
||||
|
||||
map
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Helpers
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
@@ -6,7 +6,8 @@
|
||||
|
||||
use crate::commands::ai::{
|
||||
fetch_column_comments, fetch_columns, fetch_enum_types, fetch_foreign_keys_raw,
|
||||
fetch_table_comments, fetch_unique_constraints, format_table_block, ColumnInfo,
|
||||
fetch_gp_table_extras, fetch_table_comments, fetch_unique_constraints, format_table_block,
|
||||
ColumnInfo,
|
||||
};
|
||||
use crate::commands::connections::{load_connection_config, switch_database_core};
|
||||
use crate::commands::saved_queries::{list_saved_queries_core, save_query_core};
|
||||
@@ -219,6 +220,8 @@ async fn get_columns_postgres(
|
||||
requested: &[(String, String, String)],
|
||||
) -> TuskResult<String> {
|
||||
let pool = state.get_pool(connection_id).await?;
|
||||
let is_greenplum = matches!(state.get_flavor(connection_id).await, DbFlavor::Greenplum);
|
||||
let gp_major = state.get_gp_major(connection_id).await.unwrap_or(7);
|
||||
|
||||
let (col_res, fk_res, enum_res, tbl_comm_res, col_comm_res, unique_res) = tokio::join!(
|
||||
fetch_columns(&pool),
|
||||
@@ -234,6 +237,11 @@ async fn get_columns_postgres(
|
||||
let tbl_comments = tbl_comm_res.unwrap_or_default();
|
||||
let col_comments = col_comm_res.unwrap_or_default();
|
||||
let uniques = unique_res.unwrap_or_default();
|
||||
let gp_extras = if is_greenplum {
|
||||
Some(fetch_gp_table_extras(&pool, gp_major).await)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
// Build (schema, table) → Vec<ColumnInfo>
|
||||
let mut by_table: BTreeMap<(String, String), Vec<ColumnInfo>> = BTreeMap::new();
|
||||
@@ -282,6 +290,7 @@ async fn get_columns_postgres(
|
||||
&unique_map,
|
||||
&varchar_values,
|
||||
&jsonb_keys,
|
||||
gp_extras.as_ref(),
|
||||
&mut output,
|
||||
);
|
||||
}
|
||||
|
||||
@@ -99,6 +99,7 @@ async fn close_connection(state: &AppState, id: &str) {
|
||||
let mut flavors = state.db_flavors.write().await;
|
||||
flavors.remove(id);
|
||||
drop(flavors);
|
||||
state.gp_majors.write().await.remove(id);
|
||||
state.invalidate_chat_caches_for(id).await;
|
||||
}
|
||||
|
||||
@@ -196,6 +197,19 @@ pub async fn connect(
|
||||
} else {
|
||||
DbFlavor::PostgreSQL
|
||||
};
|
||||
// Pull the GP major (6 or 7) from a string like
|
||||
// "PostgreSQL 14.4 (Greenplum Database 7.1.0 ...)".
|
||||
// GP6 and GP7 expose very different system catalogs, so downstream
|
||||
// schema-introspection code branches on this.
|
||||
let gp_major: Option<u8> = if flavor == DbFlavor::Greenplum {
|
||||
version
|
||||
.split("Greenplum Database ")
|
||||
.nth(1)
|
||||
.and_then(|tail| tail.split('.').next())
|
||||
.and_then(|major| major.parse::<u8>().ok())
|
||||
} else {
|
||||
None
|
||||
};
|
||||
state.pools.write().await.insert(config.id.clone(), pool);
|
||||
state.read_only.write().await.insert(config.id.clone(), true);
|
||||
state
|
||||
@@ -203,6 +217,11 @@ pub async fn connect(
|
||||
.write()
|
||||
.await
|
||||
.insert(config.id.clone(), flavor);
|
||||
if let Some(m) = gp_major {
|
||||
state.gp_majors.write().await.insert(config.id.clone(), m);
|
||||
} else {
|
||||
state.gp_majors.write().await.remove(&config.id);
|
||||
}
|
||||
Ok(ConnectResult { version, flavor })
|
||||
}
|
||||
}
|
||||
|
||||
@@ -40,6 +40,9 @@ pub struct AppState {
|
||||
pub ch_clients: RwLock<HashMap<String, Arc<ChClient>>>,
|
||||
pub read_only: RwLock<HashMap<String, bool>>,
|
||||
pub db_flavors: RwLock<HashMap<String, DbFlavor>>,
|
||||
/// Greenplum major version (6 or 7), tracked separately because GP6 and GP7
|
||||
/// expose very different system catalogs (GP6 = PG9.4 base, GP7 = PG14 base).
|
||||
pub gp_majors: RwLock<HashMap<String, u8>>,
|
||||
/// Legacy cache used by generate_sql/explain_sql/fix_sql_error — full DDL.
|
||||
pub schema_cache: RwLock<HashMap<String, SchemaCacheEntry>>,
|
||||
/// Chat v2 caches: lite overview per connection.
|
||||
@@ -65,6 +68,7 @@ impl AppState {
|
||||
ch_clients: RwLock::new(HashMap::new()),
|
||||
read_only: RwLock::new(HashMap::new()),
|
||||
db_flavors: RwLock::new(HashMap::new()),
|
||||
gp_majors: RwLock::new(HashMap::new()),
|
||||
schema_cache: RwLock::new(HashMap::new()),
|
||||
overview_cache: RwLock::new(HashMap::new()),
|
||||
tables_by_db_cache: RwLock::new(HashMap::new()),
|
||||
@@ -116,6 +120,12 @@ impl AppState {
|
||||
map.get(id).copied().unwrap_or(DbFlavor::PostgreSQL)
|
||||
}
|
||||
|
||||
/// Returns the Greenplum major version (6 or 7) for a connection, or None
|
||||
/// for non-GP connections / when the version string couldn't be parsed.
|
||||
pub async fn get_gp_major(&self, id: &str) -> Option<u8> {
|
||||
self.gp_majors.read().await.get(id).copied()
|
||||
}
|
||||
|
||||
pub async fn get_schema_cache(&self, connection_id: &str) -> Option<String> {
|
||||
let cache = self.schema_cache.read().await;
|
||||
cache.get(connection_id).and_then(|entry| {
|
||||
|
||||
Reference in New Issue
Block a user