Files
tusk/src-tauri/src/commands/snapshot.rs
Aleksey Shakhmatov 9237c7dd8e perf: optimize HTTP client, DB queries, and clean up dead code
- Make reqwest::Client a LazyLock singleton instead of per-call allocation
- Parallelize 3 independent DB queries in get_index_advisor_report with tokio::join!
- Eliminate per-iteration Vec allocation in snapshot FK dependency loop
- Hoist try_local_pg_dump() call in SampleData clone mode to avoid double execution
- Evict stale schema cache entries on write to prevent unbounded memory growth
- Remove unused ValidationReport struct and config_path field
- Rename IndexRecommendationType variants to remove redundant suffix
2026-04-06 13:13:03 +03:00

360 lines
11 KiB
Rust

use crate::commands::ai::fetch_foreign_keys_raw;
use crate::commands::data::bind_json_value;
use crate::commands::queries::pg_value_to_json;
use crate::error::{TuskError, TuskResult};
use crate::models::snapshot::{
CreateSnapshotParams, RestoreSnapshotParams, Snapshot, SnapshotMetadata, SnapshotProgress,
SnapshotTableData, SnapshotTableMeta,
};
use crate::state::AppState;
use crate::utils::{escape_ident, topological_sort_tables};
use serde_json::Value;
use sqlx::{Column, Row, TypeInfo};
use std::fs;
use std::sync::Arc;
use tauri::{AppHandle, Emitter, Manager, State};
#[tauri::command]
pub async fn create_snapshot(
app: AppHandle,
state: State<'_, Arc<AppState>>,
params: CreateSnapshotParams,
snapshot_id: String,
file_path: String,
) -> TuskResult<SnapshotMetadata> {
let pool = state.get_pool(&params.connection_id).await?;
let _ = app.emit(
"snapshot-progress",
SnapshotProgress {
snapshot_id: snapshot_id.clone(),
stage: "preparing".to_string(),
percent: 5,
message: "Preparing snapshot...".to_string(),
detail: None,
},
);
let mut target_tables: Vec<(String, String)> = params
.tables
.iter()
.map(|t| (t.schema.clone(), t.table.clone()))
.collect();
// Fetch FK info once — used for both dependency expansion and topological sort
let fk_rows = fetch_foreign_keys_raw(&pool).await?;
if params.include_dependencies {
for fk in &fk_rows {
if target_tables.iter().any(|(s, t)| s == &fk.schema && t == &fk.table) {
let parent = (fk.ref_schema.clone(), fk.ref_table.clone());
if !target_tables.contains(&parent) {
target_tables.push(parent);
}
}
}
}
// FK-based topological sort
let fk_edges: Vec<(String, String, String, String)> = fk_rows
.iter()
.map(|fk| {
(
fk.schema.clone(),
fk.table.clone(),
fk.ref_schema.clone(),
fk.ref_table.clone(),
)
})
.collect();
let sorted_tables = topological_sort_tables(&fk_edges, &target_tables);
let mut tx = pool.begin().await.map_err(TuskError::Database)?;
sqlx::query("SET TRANSACTION READ ONLY")
.execute(&mut *tx)
.await
.map_err(TuskError::Database)?;
let total_tables = sorted_tables.len();
let mut snapshot_tables: Vec<SnapshotTableData> = Vec::new();
let mut table_metas: Vec<SnapshotTableMeta> = Vec::new();
let mut total_rows: u64 = 0;
for (i, (schema, table)) in sorted_tables.iter().enumerate() {
let percent = (10 + (i * 80 / total_tables.max(1))).min(90) as u8;
let _ = app.emit(
"snapshot-progress",
SnapshotProgress {
snapshot_id: snapshot_id.clone(),
stage: "exporting".to_string(),
percent,
message: format!("Exporting {}.{}...", schema, table),
detail: None,
},
);
let qualified = format!("{}.{}", escape_ident(schema), escape_ident(table));
let sql = format!("SELECT * FROM {}", qualified);
let rows = sqlx::query(&sql)
.fetch_all(&mut *tx)
.await
.map_err(TuskError::Database)?;
let mut columns = Vec::new();
let mut column_types = Vec::new();
if let Some(first) = rows.first() {
for col in first.columns() {
columns.push(col.name().to_string());
column_types.push(col.type_info().name().to_string());
}
}
let data_rows: Vec<Vec<Value>> = rows
.iter()
.map(|row| {
(0..columns.len())
.map(|i| pg_value_to_json(row, i))
.collect()
})
.collect();
let row_count = data_rows.len() as u64;
total_rows += row_count;
table_metas.push(SnapshotTableMeta {
schema: schema.clone(),
table: table.clone(),
row_count,
columns: columns.clone(),
column_types: column_types.clone(),
});
snapshot_tables.push(SnapshotTableData {
schema: schema.clone(),
table: table.clone(),
columns,
column_types,
rows: data_rows,
});
}
tx.rollback().await.map_err(TuskError::Database)?;
let metadata = SnapshotMetadata {
id: snapshot_id.clone(),
name: params.name.clone(),
created_at: chrono::Utc::now().to_rfc3339(),
connection_name: String::new(),
database: String::new(),
tables: table_metas,
total_rows,
file_size_bytes: 0,
version: 1,
};
let snapshot = Snapshot {
metadata: metadata.clone(),
tables: snapshot_tables,
};
let _ = app.emit(
"snapshot-progress",
SnapshotProgress {
snapshot_id: snapshot_id.clone(),
stage: "saving".to_string(),
percent: 95,
message: "Saving snapshot file...".to_string(),
detail: None,
},
);
let json = serde_json::to_string_pretty(&snapshot)?;
let file_size = json.len() as u64;
fs::write(&file_path, json)?;
let mut final_metadata = metadata;
final_metadata.file_size_bytes = file_size;
let _ = app.emit(
"snapshot-progress",
SnapshotProgress {
snapshot_id: snapshot_id.clone(),
stage: "done".to_string(),
percent: 100,
message: "Snapshot created successfully".to_string(),
detail: Some(format!("{} rows, {} tables", total_rows, total_tables)),
},
);
Ok(final_metadata)
}
#[tauri::command]
pub async fn restore_snapshot(
app: AppHandle,
state: State<'_, Arc<AppState>>,
params: RestoreSnapshotParams,
snapshot_id: String,
) -> TuskResult<u64> {
if state.is_read_only(&params.connection_id).await {
return Err(TuskError::ReadOnly);
}
let _ = app.emit(
"snapshot-progress",
SnapshotProgress {
snapshot_id: snapshot_id.clone(),
stage: "reading".to_string(),
percent: 5,
message: "Reading snapshot file...".to_string(),
detail: None,
},
);
let data = fs::read_to_string(&params.file_path)?;
let snapshot: Snapshot = serde_json::from_str(&data)?;
let pool = state.get_pool(&params.connection_id).await?;
let mut tx = pool.begin().await.map_err(TuskError::Database)?;
sqlx::query("SET CONSTRAINTS ALL DEFERRED")
.execute(&mut *tx)
.await
.map_err(TuskError::Database)?;
// TRUNCATE in reverse order (children first)
if params.truncate_before_restore {
let _ = app.emit(
"snapshot-progress",
SnapshotProgress {
snapshot_id: snapshot_id.clone(),
stage: "truncating".to_string(),
percent: 15,
message: "Truncating existing data...".to_string(),
detail: None,
},
);
for table_data in snapshot.tables.iter().rev() {
let qualified = format!(
"{}.{}",
escape_ident(&table_data.schema),
escape_ident(&table_data.table)
);
let truncate_sql = format!("TRUNCATE {} CASCADE", qualified);
sqlx::query(&truncate_sql)
.execute(&mut *tx)
.await
.map_err(TuskError::Database)?;
}
}
// INSERT in forward order (parents first)
let total_tables = snapshot.tables.len();
let mut total_inserted: u64 = 0;
for (i, table_data) in snapshot.tables.iter().enumerate() {
if table_data.columns.is_empty() || table_data.rows.is_empty() {
continue;
}
let percent = (20 + (i * 75 / total_tables.max(1))).min(95) as u8;
let _ = app.emit(
"snapshot-progress",
SnapshotProgress {
snapshot_id: snapshot_id.clone(),
stage: "inserting".to_string(),
percent,
message: format!("Restoring {}.{}...", table_data.schema, table_data.table),
detail: Some(format!("{} rows", table_data.rows.len())),
},
);
let qualified = format!(
"{}.{}",
escape_ident(&table_data.schema),
escape_ident(&table_data.table)
);
let col_list: Vec<String> = table_data.columns.iter().map(|c| escape_ident(c)).collect();
let placeholders: Vec<String> = (1..=table_data.columns.len())
.map(|i| format!("${}", i))
.collect();
let sql = format!(
"INSERT INTO {} ({}) VALUES ({})",
qualified,
col_list.join(", "),
placeholders.join(", ")
);
// Chunked insert
for row in &table_data.rows {
let mut query = sqlx::query(&sql);
for val in row {
query = bind_json_value(query, val);
}
query.execute(&mut *tx).await.map_err(TuskError::Database)?;
total_inserted += 1;
}
}
tx.commit().await.map_err(TuskError::Database)?;
let _ = app.emit(
"snapshot-progress",
SnapshotProgress {
snapshot_id: snapshot_id.clone(),
stage: "done".to_string(),
percent: 100,
message: "Restore completed successfully".to_string(),
detail: Some(format!("{} rows restored", total_inserted)),
},
);
state.invalidate_schema_cache(&params.connection_id).await;
Ok(total_inserted)
}
#[tauri::command]
pub async fn list_snapshots(app: AppHandle) -> TuskResult<Vec<SnapshotMetadata>> {
let dir = app
.path()
.app_data_dir()
.map_err(|e| TuskError::Custom(e.to_string()))?
.join("snapshots");
if !dir.exists() {
return Ok(Vec::new());
}
let mut snapshots = Vec::new();
for entry in fs::read_dir(&dir)? {
let entry = entry?;
let path = entry.path();
if path.extension().map(|e| e == "json").unwrap_or(false) {
if let Ok(data) = fs::read_to_string(&path) {
if let Ok(snapshot) = serde_json::from_str::<Snapshot>(&data) {
let mut meta = snapshot.metadata;
meta.file_size_bytes = entry.metadata().map(|m| m.len()).unwrap_or(0);
snapshots.push(meta);
}
}
}
}
snapshots.sort_by(|a, b| b.created_at.cmp(&a.created_at));
Ok(snapshots)
}
#[tauri::command]
pub async fn read_snapshot_metadata(file_path: String) -> TuskResult<SnapshotMetadata> {
let data = fs::read_to_string(&file_path)?;
let snapshot: Snapshot = serde_json::from_str(&data)?;
let mut meta = snapshot.metadata;
meta.file_size_bytes = fs::metadata(&file_path).map(|m| m.len()).unwrap_or(0);
Ok(meta)
}