feat: add cross-database entity lookup for searching column values across all databases

Enables searching for a specific column value (e.g. carrier_id=123) across all databases on a PostgreSQL server. The backend creates temporary connection pools per database (semaphore-limited to 5), queries information_schema for matching columns, and executes read-only SELECTs with real-time progress events. Results are grouped by database/table in a new "Entity Lookup" tab.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-02-13 17:28:33 +03:00
parent a2371f00df
commit d5cff8bd5e
13 changed files with 1030 additions and 5 deletions

View File

@@ -0,0 +1,367 @@
use crate::commands::queries::pg_value_to_json;
use crate::error::TuskResult;
use crate::models::connection::ConnectionConfig;
use crate::models::lookup::{
EntityLookupResult, LookupDatabaseResult, LookupProgress, LookupTableMatch,
};
use crate::utils::escape_ident;
use sqlx::postgres::PgPoolOptions;
use sqlx::{Column, Row, TypeInfo};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Instant;
use tauri::{AppHandle, Emitter};
use tokio::sync::Semaphore;
struct TableCandidate {
schema: String,
table: String,
data_type: String,
}
async fn search_database(
config: &ConnectionConfig,
database: &str,
column_name: &str,
value: &str,
) -> LookupDatabaseResult {
let start = Instant::now();
let mut db_config = config.clone();
db_config.database = database.to_string();
let url = db_config.connection_url();
let pool = match PgPoolOptions::new()
.max_connections(2)
.acquire_timeout(std::time::Duration::from_secs(5))
.connect(&url)
.await
{
Ok(p) => p,
Err(e) => {
return LookupDatabaseResult {
database: database.to_string(),
tables: vec![],
error: Some(format!("Connection failed: {}", e)),
search_time_ms: start.elapsed().as_millis(),
};
}
};
let result = tokio::time::timeout(
std::time::Duration::from_secs(30),
search_database_inner(&pool, database, column_name, value),
)
.await;
pool.close().await;
match result {
Ok(db_result) => {
let mut db_result = db_result;
db_result.search_time_ms = start.elapsed().as_millis();
db_result
}
Err(_) => LookupDatabaseResult {
database: database.to_string(),
tables: vec![],
error: Some("Timeout (30s)".to_string()),
search_time_ms: start.elapsed().as_millis(),
},
}
}
async fn search_database_inner(
pool: &sqlx::PgPool,
database: &str,
column_name: &str,
value: &str,
) -> LookupDatabaseResult {
// Find tables that have this column
let candidates = match sqlx::query_as::<_, (String, String, String)>(
"SELECT table_schema, table_name, data_type \
FROM information_schema.columns \
WHERE column_name = $1 \
AND table_schema NOT IN ('pg_catalog', 'information_schema', 'pg_toast')",
)
.bind(column_name)
.fetch_all(pool)
.await
{
Ok(rows) => rows
.into_iter()
.map(|(schema, table, data_type)| TableCandidate {
schema,
table,
data_type,
})
.collect::<Vec<_>>(),
Err(e) => {
return LookupDatabaseResult {
database: database.to_string(),
tables: vec![],
error: Some(format!("Schema query failed: {}", e)),
search_time_ms: 0,
};
}
};
let mut tables = Vec::new();
for candidate in &candidates {
let qualified = format!(
"{}.{}",
escape_ident(&candidate.schema),
escape_ident(&candidate.table)
);
let col_ident = escape_ident(column_name);
// Read-only transaction: SELECT rows + COUNT
let select_sql = format!(
"SELECT * FROM {} WHERE {}::text = $1 LIMIT 50",
qualified, col_ident
);
let count_sql = format!(
"SELECT COUNT(*) FROM {} WHERE {}::text = $1",
qualified, col_ident
);
let mut tx = match pool.begin().await {
Ok(tx) => tx,
Err(e) => {
tables.push(LookupTableMatch {
schema: candidate.schema.clone(),
table: candidate.table.clone(),
column_type: candidate.data_type.clone(),
columns: vec![],
types: vec![],
rows: vec![],
row_count: 0,
total_count: 0,
});
log::warn!(
"Failed to begin tx for {}.{}: {}",
candidate.schema,
candidate.table,
e
);
continue;
}
};
if let Err(e) = sqlx::query("SET TRANSACTION READ ONLY")
.execute(&mut *tx)
.await
{
let _ = tx.rollback().await;
log::warn!("Failed SET TRANSACTION READ ONLY: {}", e);
continue;
}
let rows_result = sqlx::query(&select_sql)
.bind(value)
.fetch_all(&mut *tx)
.await;
let count_result: Result<i64, _> = sqlx::query_scalar(&count_sql)
.bind(value)
.fetch_one(&mut *tx)
.await;
let _ = tx.rollback().await;
match rows_result {
Ok(rows) if !rows.is_empty() => {
let mut col_names = Vec::new();
let mut col_types = Vec::new();
if let Some(first) = rows.first() {
for col in first.columns() {
col_names.push(col.name().to_string());
col_types.push(col.type_info().name().to_string());
}
}
let result_rows: Vec<Vec<serde_json::Value>> = rows
.iter()
.map(|row| {
(0..col_names.len())
.map(|i| pg_value_to_json(row, i))
.collect()
})
.collect();
let row_count = result_rows.len();
let total_count = count_result.unwrap_or(row_count as i64);
tables.push(LookupTableMatch {
schema: candidate.schema.clone(),
table: candidate.table.clone(),
column_type: candidate.data_type.clone(),
columns: col_names,
types: col_types,
rows: result_rows,
row_count,
total_count,
});
}
Ok(_) => {
// No rows matched — skip
}
Err(e) => {
log::warn!(
"Query failed for {}.{}: {}",
candidate.schema,
candidate.table,
e
);
}
}
}
LookupDatabaseResult {
database: database.to_string(),
tables,
error: None,
search_time_ms: 0,
}
}
#[tauri::command]
pub async fn entity_lookup(
app: AppHandle,
config: ConnectionConfig,
column_name: String,
value: String,
databases: Option<Vec<String>>,
lookup_id: String,
) -> TuskResult<EntityLookupResult> {
let start = Instant::now();
// 1. Get list of databases
let url = config.connection_url();
let pool = PgPoolOptions::new()
.max_connections(1)
.acquire_timeout(std::time::Duration::from_secs(5))
.connect(&url)
.await
.map_err(crate::error::TuskError::Database)?;
let db_names: Vec<String> = sqlx::query_scalar(
"SELECT datname FROM pg_database WHERE datistemplate = false ORDER BY datname",
)
.fetch_all(&pool)
.await
.map_err(crate::error::TuskError::Database)?;
pool.close().await;
// Filter if specific databases requested
let db_names: Vec<String> = if let Some(ref filter) = databases {
db_names
.into_iter()
.filter(|d| filter.contains(d))
.collect()
} else {
db_names
};
let total = db_names.len();
let completed = Arc::new(AtomicUsize::new(0));
let semaphore = Arc::new(Semaphore::new(5));
// 2. Parallel search across databases
let mut handles = Vec::new();
for db_name in db_names {
let config = config.clone();
let column_name = column_name.clone();
let value = value.clone();
let lookup_id = lookup_id.clone();
let app = app.clone();
let semaphore = semaphore.clone();
let completed = completed.clone();
let handle = tokio::spawn(async move {
let _permit = semaphore.acquire().await.unwrap();
// Emit "searching" progress
let _ = app.emit(
"lookup-progress",
LookupProgress {
lookup_id: lookup_id.clone(),
database: db_name.clone(),
status: "searching".to_string(),
tables_found: 0,
rows_found: 0,
error: None,
completed: completed.load(Ordering::Relaxed),
total,
},
);
let result = search_database(&config, &db_name, &column_name, &value).await;
let done = completed.fetch_add(1, Ordering::Relaxed) + 1;
let status = if result.error.is_some() {
"error"
} else {
"done"
};
let _ = app.emit(
"lookup-progress",
LookupProgress {
lookup_id: lookup_id.clone(),
database: db_name.clone(),
status: status.to_string(),
tables_found: result.tables.len(),
rows_found: result.tables.iter().map(|t| t.row_count).sum(),
error: result.error.clone(),
completed: done,
total,
},
);
result
});
handles.push(handle);
}
// 3. Collect results
let mut all_results = Vec::new();
for handle in handles {
match handle.await {
Ok(result) => all_results.push(result),
Err(e) => {
log::error!("Join error: {}", e);
}
}
}
// Sort: databases with matches first, then by name
all_results.sort_by(|a, b| {
let a_has = !a.tables.is_empty();
let b_has = !b.tables.is_empty();
b_has.cmp(&a_has).then(a.database.cmp(&b.database))
});
let total_databases_searched = all_results.len();
let total_tables_matched: usize = all_results.iter().map(|d| d.tables.len()).sum();
let total_rows_found: usize = all_results
.iter()
.flat_map(|d| d.tables.iter())
.map(|t| t.row_count)
.sum();
Ok(EntityLookupResult {
column_name,
value,
databases: all_results,
total_databases_searched,
total_tables_matched,
total_rows_found,
total_time_ms: start.elapsed().as_millis(),
})
}

View File

@@ -1,7 +1,9 @@
pub mod ai;
pub mod connections;
pub mod data;
pub mod export;
pub mod history;
pub mod lookup;
pub mod management;
pub mod queries;
pub mod saved_queries;

View File

@@ -88,6 +88,13 @@ pub fn run() {
commands::saved_queries::list_saved_queries,
commands::saved_queries::save_query,
commands::saved_queries::delete_saved_query,
// ai
commands::ai::get_ai_settings,
commands::ai::save_ai_settings,
commands::ai::list_ollama_models,
commands::ai::generate_sql,
// lookup
commands::lookup::entity_lookup,
])
.run(tauri::generate_context!())
.expect("error while running tauri application");

View File

@@ -0,0 +1,44 @@
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LookupTableMatch {
pub schema: String,
pub table: String,
pub column_type: String,
pub columns: Vec<String>,
pub types: Vec<String>,
pub rows: Vec<Vec<serde_json::Value>>,
pub row_count: usize,
pub total_count: i64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LookupDatabaseResult {
pub database: String,
pub tables: Vec<LookupTableMatch>,
pub error: Option<String>,
pub search_time_ms: u128,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EntityLookupResult {
pub column_name: String,
pub value: String,
pub databases: Vec<LookupDatabaseResult>,
pub total_databases_searched: usize,
pub total_tables_matched: usize,
pub total_rows_found: usize,
pub total_time_ms: u128,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LookupProgress {
pub lookup_id: String,
pub database: String,
pub status: String,
pub tables_found: usize,
pub rows_found: usize,
pub error: Option<String>,
pub completed: usize,
pub total: usize,
}

View File

@@ -1,5 +1,7 @@
pub mod ai;
pub mod connection;
pub mod history;
pub mod lookup;
pub mod management;
pub mod query_result;
pub mod saved_queries;