use crate::commands::queries::pg_value_to_json; use crate::error::{TuskError, TuskResult}; use crate::models::query_result::PaginatedQueryResult; use crate::state::AppState; use crate::utils::escape_ident; use serde_json::Value; use sqlx::{Column, Row, TypeInfo}; use std::sync::Arc; use std::time::Instant; use tauri::State; #[tauri::command] pub async fn get_table_data( state: State<'_, Arc>, connection_id: String, schema: String, table: String, page: u32, page_size: u32, sort_column: Option, sort_direction: Option, filter: Option, ) -> TuskResult { let pool = state.get_pool(&connection_id).await?; let qualified = format!("{}.{}", escape_ident(&schema), escape_ident(&table)); let mut where_clause = String::new(); if let Some(ref f) = filter { if !f.trim().is_empty() { where_clause = format!(" WHERE {}", f); } } let mut order_clause = String::new(); if let Some(ref col) = sort_column { let dir = sort_direction.as_deref().unwrap_or("ASC"); let dir = if dir.eq_ignore_ascii_case("desc") { "DESC" } else { "ASC" }; order_clause = format!(" ORDER BY {} {}", escape_ident(col), dir); } let offset = (page.saturating_sub(1)) * page_size; let data_sql = format!( "SELECT *, ctid::text FROM {}{}{} LIMIT {} OFFSET {}", qualified, where_clause, order_clause, page_size, offset ); let count_sql = format!("SELECT COUNT(*) FROM {}{}", qualified, where_clause); let start = Instant::now(); // Always run table data queries in a read-only transaction to prevent // writable CTEs or other mutation via the raw filter parameter. let mut tx = (&pool).begin().await.map_err(TuskError::Database)?; sqlx::query("SET TRANSACTION READ ONLY") .execute(&mut *tx) .await .map_err(TuskError::Database)?; let rows = sqlx::query(&data_sql) .fetch_all(&mut *tx) .await .map_err(TuskError::Database)?; let count_row = sqlx::query(&count_sql) .fetch_one(&mut *tx) .await .map_err(TuskError::Database)?; tx.rollback().await.map_err(TuskError::Database)?; let execution_time_ms = start.elapsed().as_millis(); let total_rows: i64 = count_row.get(0); let mut all_columns = Vec::new(); let mut all_types = Vec::new(); if let Some(first_row) = rows.first() { for col in first_row.columns() { all_columns.push(col.name().to_string()); all_types.push(col.type_info().name().to_string()); } } // Find and strip the trailing ctid column let ctid_idx = all_columns.iter().rposition(|c| c == "ctid"); let mut ctids: Vec = Vec::new(); let (columns, types) = if let Some(idx) = ctid_idx { let mut cols = all_columns.clone(); let mut tps = all_types.clone(); cols.remove(idx); tps.remove(idx); (cols, tps) } else { (all_columns.clone(), all_types.clone()) }; let result_rows: Vec> = rows .iter() .map(|row| { if let Some(idx) = ctid_idx { let ctid_val: String = row.get(idx); ctids.push(ctid_val); } (0..all_columns.len()) .filter(|i| Some(*i) != ctid_idx) .map(|i| pg_value_to_json(row, i)) .collect() }) .collect(); let row_count = result_rows.len(); Ok(PaginatedQueryResult { columns, types, rows: result_rows, row_count, execution_time_ms, total_rows, page, page_size, ctids, }) } #[tauri::command] pub async fn update_row( state: State<'_, Arc>, connection_id: String, schema: String, table: String, pk_columns: Vec, pk_values: Vec, column: String, value: Value, ctid: Option, ) -> TuskResult<()> { if state.is_read_only(&connection_id).await { return Err(TuskError::ReadOnly); } let pool = state.get_pool(&connection_id).await?; let qualified = format!("{}.{}", escape_ident(&schema), escape_ident(&table)); let set_clause = format!("{} = $1", escape_ident(&column)); if pk_columns.is_empty() { // Fallback: use ctid for row identification let ctid_val = ctid.ok_or_else(|| { TuskError::Custom("Cannot update: no primary key and no ctid provided".into()) })?; let sql = format!( "UPDATE {} SET {} WHERE ctid = $2::tid", qualified, set_clause ); let mut query = sqlx::query(&sql); query = bind_json_value(query, &value); query = query.bind(ctid_val); query.execute(&pool).await.map_err(TuskError::Database)?; } else { let where_parts: Vec = pk_columns .iter() .enumerate() .map(|(i, col)| format!("{} = ${}", escape_ident(col), i + 2)) .collect(); let where_clause = where_parts.join(" AND "); let sql = format!( "UPDATE {} SET {} WHERE {}", qualified, set_clause, where_clause ); let mut query = sqlx::query(&sql); query = bind_json_value(query, &value); for pk_val in &pk_values { query = bind_json_value(query, pk_val); } query.execute(&pool).await.map_err(TuskError::Database)?; } Ok(()) } #[tauri::command] pub async fn insert_row( state: State<'_, Arc>, connection_id: String, schema: String, table: String, columns: Vec, values: Vec, ) -> TuskResult<()> { if state.is_read_only(&connection_id).await { return Err(TuskError::ReadOnly); } let pool = state.get_pool(&connection_id).await?; let qualified = format!("{}.{}", escape_ident(&schema), escape_ident(&table)); let col_list: Vec = columns.iter().map(|c| escape_ident(c)).collect(); let placeholders: Vec = (1..=columns.len()).map(|i| format!("${}", i)).collect(); let sql = format!( "INSERT INTO {} ({}) VALUES ({})", qualified, col_list.join(", "), placeholders.join(", ") ); let mut query = sqlx::query(&sql); for val in &values { query = bind_json_value(query, val); } query.execute(&pool).await.map_err(TuskError::Database)?; Ok(()) } #[tauri::command] pub async fn delete_rows( state: State<'_, Arc>, connection_id: String, schema: String, table: String, pk_columns: Vec, pk_values_list: Vec>, ctids: Option>, ) -> TuskResult { if state.is_read_only(&connection_id).await { return Err(TuskError::ReadOnly); } let pool = state.get_pool(&connection_id).await?; let qualified = format!("{}.{}", escape_ident(&schema), escape_ident(&table)); let mut total_affected: u64 = 0; // Wrap all deletes in a transaction for atomicity let mut tx = (&pool).begin().await.map_err(TuskError::Database)?; if pk_columns.is_empty() { // Fallback: use ctids for row identification let ctid_list = ctids.ok_or_else(|| { TuskError::Custom("Cannot delete: no primary key and no ctids provided".into()) })?; for ctid_val in &ctid_list { let sql = format!("DELETE FROM {} WHERE ctid = $1::tid", qualified); let query = sqlx::query(&sql).bind(ctid_val); let result = query.execute(&mut *tx).await.map_err(TuskError::Database)?; total_affected += result.rows_affected(); } } else { for pk_values in &pk_values_list { let where_parts: Vec = pk_columns .iter() .enumerate() .map(|(i, col)| format!("{} = ${}", escape_ident(col), i + 1)) .collect(); let where_clause = where_parts.join(" AND "); let sql = format!("DELETE FROM {} WHERE {}", qualified, where_clause); let mut query = sqlx::query(&sql); for val in pk_values { query = bind_json_value(query, val); } let result = query.execute(&mut *tx).await.map_err(TuskError::Database)?; total_affected += result.rows_affected(); } } tx.commit().await.map_err(TuskError::Database)?; Ok(total_affected) } pub(crate) fn bind_json_value<'q>( query: sqlx::query::Query<'q, sqlx::Postgres, sqlx::postgres::PgArguments>, value: &'q Value, ) -> sqlx::query::Query<'q, sqlx::Postgres, sqlx::postgres::PgArguments> { match value { Value::Null => query.bind(None::), Value::Bool(b) => query.bind(*b), Value::Number(n) => { if let Some(i) = n.as_i64() { query.bind(i) } else if let Some(f) = n.as_f64() { query.bind(f) } else { query.bind(n.to_string()) } } Value::String(s) => query.bind(s.as_str()), _ => query.bind(value.to_string()), } }