use crate::error::{TuskError, TuskResult}; use crate::models::connection::ConnectionConfig; use crate::models::docker::{ CloneMode, CloneProgress, CloneResult, CloneToDockerParams, DockerStatus, TuskContainer, }; use crate::state::AppState; use std::fs; use std::sync::Arc; use tauri::{AppHandle, Emitter, Manager, State}; use tokio::process::Command; async fn docker_cmd(state: &AppState) -> Command { let host = state.docker_host.read().await; let mut cmd = Command::new("docker"); if let Some(ref h) = *host { cmd.args(["-H", h]); } cmd } fn docker_err(msg: impl Into) -> TuskError { TuskError::Docker(msg.into()) } fn emit_progress( app: &AppHandle, clone_id: &str, stage: &str, percent: u8, message: &str, detail: Option<&str>, ) { let _ = app.emit( "clone-progress", CloneProgress { clone_id: clone_id.to_string(), stage: stage.to_string(), percent, message: message.to_string(), detail: detail.map(|s| s.to_string()), }, ); } fn get_connections_path(app: &AppHandle) -> TuskResult { let dir = app .path() .app_data_dir() .map_err(|e| TuskError::Custom(e.to_string()))?; fs::create_dir_all(&dir)?; Ok(dir.join("connections.json")) } fn load_connection_config(app: &AppHandle, connection_id: &str) -> TuskResult { let path = get_connections_path(app)?; if !path.exists() { return Err(TuskError::ConnectionNotFound(connection_id.to_string())); } let data = fs::read_to_string(&path)?; let connections: Vec = serde_json::from_str(&data)?; connections .into_iter() .find(|c| c.id == connection_id) .ok_or_else(|| TuskError::ConnectionNotFound(connection_id.to_string())) } /// Shell-escape a string for use in single quotes fn shell_escape(s: &str) -> String { s.replace('\'', "'\\''") } #[tauri::command] pub async fn check_docker(state: State<'_, Arc>) -> TuskResult { let output = docker_cmd(&state) .await .args(["version", "--format", "{{.Server.Version}}"]) .output() .await; match output { Ok(out) => { if out.status.success() { let version = String::from_utf8_lossy(&out.stdout).trim().to_string(); Ok(DockerStatus { installed: true, daemon_running: true, version: Some(version), error: None, }) } else { let stderr = String::from_utf8_lossy(&out.stderr).trim().to_string(); let daemon_running = !stderr.contains("Cannot connect") && !stderr.contains("connection refused"); Ok(DockerStatus { installed: true, daemon_running, version: None, error: Some(stderr), }) } } Err(_) => Ok(DockerStatus { installed: false, daemon_running: false, version: None, error: Some("Docker CLI not found. Please install Docker.".to_string()), }), } } #[tauri::command] pub async fn list_tusk_containers(state: State<'_, Arc>) -> TuskResult> { let output = docker_cmd(&state) .await .args([ "ps", "-a", "--filter", "label=tusk.managed=true", "--format", "{{.ID}}\t{{.Names}}\t{{.Status}}\t{{.Label \"tusk.pg-version\"}}\t{{.Label \"tusk.source-db\"}}\t{{.Label \"tusk.source-connection\"}}\t{{.CreatedAt}}\t{{.Ports}}", ]) .output() .await .map_err(|e| docker_err(format!("Failed to run docker ps: {}", e)))?; if !output.status.success() { let stderr = String::from_utf8_lossy(&output.stderr); return Err(docker_err(format!("docker ps failed: {}", stderr))); } let stdout = String::from_utf8_lossy(&output.stdout); let mut containers = Vec::new(); for line in stdout.lines() { if line.trim().is_empty() { continue; } let parts: Vec<&str> = line.split('\t').collect(); if parts.len() < 8 { continue; } let host_port = parse_host_port(parts[7]); containers.push(TuskContainer { container_id: parts[0].to_string(), name: parts[1].to_string(), status: parts[2].to_string(), host_port, pg_version: parts[3].to_string(), source_database: if parts[4].is_empty() { None } else { Some(parts[4].to_string()) }, source_connection: if parts[5].is_empty() { None } else { Some(parts[5].to_string()) }, created_at: if parts[6].is_empty() { None } else { Some(parts[6].to_string()) }, }); } Ok(containers) } fn parse_host_port(ports_str: &str) -> u16 { for part in ports_str.split(',') { let part = part.trim(); if let Some(arrow_pos) = part.find("->") { let before = &part[..arrow_pos]; if let Some(colon_pos) = before.rfind(':') { if let Ok(port) = before[colon_pos + 1..].parse::() { return port; } } } } 0 } #[tauri::command] pub async fn clone_to_docker( app: AppHandle, state: State<'_, Arc>, params: CloneToDockerParams, clone_id: String, ) -> TuskResult { let state = state.inner().clone(); let app_clone = app.clone(); tokio::spawn(async move { do_clone(&app_clone, &state, ¶ms, &clone_id).await }) .await .map_err(|e| docker_err(format!("Clone task panicked: {}", e)))? } /// Build a docker Command respecting the remote host setting fn docker_cmd_sync(docker_host: &Option) -> Command { let mut cmd = Command::new("docker"); if let Some(ref h) = docker_host { cmd.args(["-H", h]); } cmd } async fn check_docker_internal(docker_host: &Option) -> TuskResult { let output = docker_cmd_sync(docker_host) .args(["version", "--format", "{{.Server.Version}}"]) .output() .await; match output { Ok(out) => { if out.status.success() { let version = String::from_utf8_lossy(&out.stdout).trim().to_string(); Ok(DockerStatus { installed: true, daemon_running: true, version: Some(version), error: None, }) } else { let stderr = String::from_utf8_lossy(&out.stderr).trim().to_string(); let daemon_running = !stderr.contains("Cannot connect") && !stderr.contains("connection refused"); Ok(DockerStatus { installed: true, daemon_running, version: None, error: Some(stderr), }) } } Err(_) => Ok(DockerStatus { installed: false, daemon_running: false, version: None, error: Some("Docker CLI not found. Please install Docker.".to_string()), }), } } async fn do_clone( app: &AppHandle, state: &Arc, params: &CloneToDockerParams, clone_id: &str, ) -> TuskResult { let docker_host = state.docker_host.read().await.clone(); // Step 1: Check Docker emit_progress(app, clone_id, "checking", 5, "Checking Docker availability...", None); let status = check_docker_internal(&docker_host).await?; if !status.installed || !status.daemon_running { let msg = status .error .unwrap_or_else(|| "Docker is not available".to_string()); emit_progress(app, clone_id, "error", 5, &msg, None); return Err(docker_err(msg)); } // Step 2: Find available port emit_progress(app, clone_id, "port", 10, "Finding available port...", None); let host_port = match params.host_port { Some(p) => p, None => find_free_port().await?, }; emit_progress(app, clone_id, "port", 10, &format!("Using port {}", host_port), None); // Step 3: Create container emit_progress(app, clone_id, "container", 20, "Creating PostgreSQL container...", None); let pg_password = params.postgres_password.as_deref().unwrap_or("tusk"); let image = format!("postgres:{}", params.pg_version); let create_output = docker_cmd_sync(&docker_host) .args([ "run", "-d", "--name", ¶ms.container_name, "-p", &format!("{}:5432", host_port), "-e", &format!("POSTGRES_PASSWORD={}", pg_password), "-l", "tusk.managed=true", "-l", &format!("tusk.source-db={}", params.source_database), "-l", &format!("tusk.source-connection={}", params.source_connection_id), "-l", &format!("tusk.pg-version={}", params.pg_version), &image, ]) .output() .await .map_err(|e| docker_err(format!("Failed to create container: {}", e)))?; if !create_output.status.success() { let stderr = String::from_utf8_lossy(&create_output.stderr).trim().to_string(); emit_progress(app, clone_id, "error", 20, &format!("Failed to create container: {}", stderr), None); return Err(docker_err(format!("Failed to create container: {}", stderr))); } let container_id = String::from_utf8_lossy(&create_output.stdout).trim().to_string(); // Step 4: Wait for PostgreSQL to be ready emit_progress(app, clone_id, "waiting", 30, "Waiting for PostgreSQL to be ready...", None); wait_for_pg_ready(&docker_host, ¶ms.container_name, 30).await?; emit_progress(app, clone_id, "waiting", 35, "PostgreSQL is ready", None); // Step 5: Create target database emit_progress(app, clone_id, "database", 35, &format!("Creating database '{}'...", params.source_database), None); let create_db_output = docker_cmd_sync(&docker_host) .args([ "exec", ¶ms.container_name, "psql", "-U", "postgres", "-c", &format!("CREATE DATABASE \"{}\"", params.source_database), ]) .output() .await .map_err(|e| docker_err(format!("Failed to create database: {}", e)))?; if !create_db_output.status.success() { let stderr = String::from_utf8_lossy(&create_db_output.stderr).trim().to_string(); if !stderr.contains("already exists") { emit_progress(app, clone_id, "error", 35, &format!("Failed to create database: {}", stderr), None); return Err(docker_err(format!("Failed to create database: {}", stderr))); } } // Step 6: Get source connection URL (using the specific database to clone) emit_progress(app, clone_id, "dump", 40, "Preparing data transfer...", None); let source_config = load_connection_config(app, ¶ms.source_connection_id)?; let source_url = source_config.connection_url_for_db(¶ms.source_database); emit_progress( app, clone_id, "dump", 40, &format!("Source: {}@{}:{}/{}", source_config.user, source_config.host, source_config.port, params.source_database), None, ); // Step 7: Transfer data based on clone mode match params.clone_mode { CloneMode::SchemaOnly => { emit_progress(app, clone_id, "transfer", 45, "Dumping schema...", None); transfer_schema_only(app, clone_id, &source_url, ¶ms.container_name, ¶ms.source_database, ¶ms.pg_version, &docker_host).await?; } CloneMode::FullClone => { emit_progress(app, clone_id, "transfer", 45, "Performing full database clone...", None); transfer_full_clone(app, clone_id, &source_url, ¶ms.container_name, ¶ms.source_database, ¶ms.pg_version, &docker_host).await?; } CloneMode::SampleData => { emit_progress(app, clone_id, "transfer", 45, "Dumping schema...", None); transfer_schema_only(app, clone_id, &source_url, ¶ms.container_name, ¶ms.source_database, ¶ms.pg_version, &docker_host).await?; emit_progress(app, clone_id, "transfer", 65, "Copying sample data...", None); let sample_rows = params.sample_rows.unwrap_or(1000); transfer_sample_data(app, clone_id, &source_url, ¶ms.container_name, ¶ms.source_database, ¶ms.pg_version, sample_rows, &docker_host).await?; } } // Step 8: Save connection in Tusk emit_progress(app, clone_id, "connection", 90, "Saving connection...", None); let connection_id = uuid::Uuid::new_v4().to_string(); let new_config = ConnectionConfig { id: connection_id.clone(), name: format!("{} (Docker clone)", params.source_database), host: "localhost".to_string(), port: host_port, user: "postgres".to_string(), password: pg_password.to_string(), database: params.source_database.clone(), ssl_mode: Some("disable".to_string()), color: Some("#06b6d4".to_string()), environment: Some("local".to_string()), }; save_connection_config(app, &new_config)?; let connection_url = format!( "postgres://postgres:{}@localhost:{}/{}", pg_password, host_port, params.source_database ); let container = TuskContainer { container_id: container_id[..12.min(container_id.len())].to_string(), name: params.container_name.clone(), status: "Up".to_string(), host_port, pg_version: params.pg_version.clone(), source_database: Some(params.source_database.clone()), source_connection: Some(params.source_connection_id.clone()), created_at: Some(chrono::Local::now().format("%Y-%m-%d %H:%M:%S").to_string()), }; let result = CloneResult { container, connection_id, connection_url, }; emit_progress(app, clone_id, "done", 100, "Clone completed successfully!", None); Ok(result) } async fn find_free_port() -> TuskResult { let listener = tokio::net::TcpListener::bind("127.0.0.1:0") .await .map_err(|e| docker_err(format!("Failed to find free port: {}", e)))?; let port = listener .local_addr() .map_err(|e| docker_err(format!("Failed to get port: {}", e)))? .port(); drop(listener); Ok(port) } async fn wait_for_pg_ready(docker_host: &Option, container_name: &str, timeout_secs: u64) -> TuskResult<()> { let start = std::time::Instant::now(); let timeout = std::time::Duration::from_secs(timeout_secs); loop { if start.elapsed() > timeout { return Err(docker_err("PostgreSQL did not become ready in time")); } let output = docker_cmd_sync(docker_host) .args(["exec", container_name, "pg_isready", "-U", "postgres"]) .output() .await; if let Ok(out) = output { if out.status.success() { return Ok(()); } } tokio::time::sleep(std::time::Duration::from_millis(500)).await; } } async fn try_local_pg_dump() -> bool { Command::new("pg_dump") .arg("--version") .output() .await .map(|o| o.status.success()) .unwrap_or(false) } /// Build the docker host flag string for shell commands fn docker_host_flag(docker_host: &Option) -> String { match docker_host { Some(h) => format!("-H '{}'", shell_escape(h)), None => String::new(), } } /// Build the pg_dump portion of a shell command fn pg_dump_shell_cmd(has_local: bool, pg_version: &str, extra_args: &str, source_url: &str, docker_host: &Option) -> String { let escaped_url = shell_escape(source_url); if has_local { format!("pg_dump {} '{}'", extra_args, escaped_url) } else { let host_flag = docker_host_flag(docker_host); format!( "docker {} run --rm --network=host postgres:{} pg_dump {} '{}'", host_flag, pg_version, extra_args, escaped_url ) } } async fn run_pipe_cmd( app: &AppHandle, clone_id: &str, pipe_cmd: &str, label: &str, ) -> TuskResult { // Use bash with pipefail so pg_dump failures are not swallowed let wrapped = format!("set -o pipefail; {}", pipe_cmd); emit_progress(app, clone_id, "transfer", 50, label, None); let output = Command::new("bash") .args(["-c", &wrapped]) .output() .await .map_err(|e| docker_err(format!("{} failed to start: {}", label, e)))?; let stderr = String::from_utf8_lossy(&output.stderr).trim().to_string(); let stdout = String::from_utf8_lossy(&output.stdout).trim().to_string(); // Always log stderr if present if !stderr.is_empty() { // Truncate for progress display (full log can be long) let short = if stderr.len() > 500 { format!("{}...", &stderr[..500]) } else { stderr.clone() }; emit_progress(app, clone_id, "transfer", 55, &format!("{}: stderr output", label), Some(&short)); } // Count DDL statements in stdout for feedback if !stdout.is_empty() { let creates = stdout.lines() .filter(|l| l.starts_with("CREATE") || l.starts_with("ALTER") || l.starts_with("SET")) .count(); if creates > 0 { emit_progress(app, clone_id, "transfer", 58, &format!("Applied {} SQL statements", creates), None); } } if !output.status.success() { let code = output.status.code().unwrap_or(-1); emit_progress( app, clone_id, "transfer", 55, &format!("{} exited with code {}", label, code), Some(&stderr), ); // Only hard-fail on connection / fatal errors if stderr.contains("FATAL") || stderr.contains("could not connect") || stderr.contains("No such file") || stderr.contains("password authentication failed") || stderr.contains("does not exist") || (stdout.is_empty() && stderr.is_empty()) { return Err(docker_err(format!("{} failed (exit {}): {}", label, code, stderr))); } } Ok(output) } async fn transfer_schema_only( app: &AppHandle, clone_id: &str, source_url: &str, container_name: &str, database: &str, pg_version: &str, docker_host: &Option, ) -> TuskResult<()> { let has_local = try_local_pg_dump().await; let label = if has_local { "local pg_dump" } else { "Docker-based pg_dump" }; emit_progress(app, clone_id, "transfer", 48, &format!("Using {} for schema...", label), None); let dump_cmd = pg_dump_shell_cmd(has_local, pg_version, "--schema-only --no-owner --no-acl", source_url, docker_host); let escaped_db = shell_escape(database); let host_flag = docker_host_flag(docker_host); let pipe_cmd = format!( "{} | docker {} exec -i '{}' psql -U postgres -d '{}'", dump_cmd, host_flag, shell_escape(container_name), escaped_db ); run_pipe_cmd(app, clone_id, &pipe_cmd, "Schema transfer").await?; emit_progress(app, clone_id, "transfer", 60, "Schema transferred successfully", None); Ok(()) } async fn transfer_full_clone( app: &AppHandle, clone_id: &str, source_url: &str, container_name: &str, database: &str, pg_version: &str, docker_host: &Option, ) -> TuskResult<()> { let has_local = try_local_pg_dump().await; let label = if has_local { "local pg_dump" } else { "Docker-based pg_dump" }; emit_progress(app, clone_id, "transfer", 48, &format!("Using {} for full clone...", label), None); // Use plain text format piped to psql (more reliable than -Fc | pg_restore through docker exec) let dump_cmd = pg_dump_shell_cmd(has_local, pg_version, "--no-owner --no-acl", source_url, docker_host); let escaped_db = shell_escape(database); let host_flag = docker_host_flag(docker_host); let pipe_cmd = format!( "{} | docker {} exec -i '{}' psql -U postgres -d '{}'", dump_cmd, host_flag, shell_escape(container_name), escaped_db ); run_pipe_cmd(app, clone_id, &pipe_cmd, "Full clone").await?; emit_progress(app, clone_id, "transfer", 85, "Full clone completed", None); Ok(()) } async fn transfer_sample_data( app: &AppHandle, clone_id: &str, source_url: &str, container_name: &str, database: &str, pg_version: &str, sample_rows: u32, docker_host: &Option, ) -> TuskResult<()> { // List tables from the target (schema already transferred) let target_output = docker_cmd_sync(docker_host) .args([ "exec", container_name, "psql", "-U", "postgres", "-d", database, "-t", "-A", "-c", "SELECT schemaname || '.' || tablename FROM pg_tables WHERE schemaname NOT IN ('pg_catalog', 'information_schema') ORDER BY schemaname, tablename", ]) .output() .await .map_err(|e| docker_err(format!("Failed to list tables: {}", e)))?; let tables_str = String::from_utf8_lossy(&target_output.stdout); let tables: Vec<&str> = tables_str.lines().filter(|l| !l.trim().is_empty()).collect(); let total = tables.len(); if total == 0 { emit_progress(app, clone_id, "transfer", 85, "No tables to copy data for", None); return Ok(()); } let has_local = try_local_pg_dump().await; for (i, qualified_table) in tables.iter().enumerate() { let pct = 65 + ((i * 20) / total.max(1)).min(20) as u8; emit_progress( app, clone_id, "transfer", pct, &format!("Copying sample data: {} ({}/{})", qualified_table, i + 1, total), None, ); let parts: Vec<&str> = qualified_table.splitn(2, '.').collect(); if parts.len() != 2 { continue; } let schema = parts[0]; let table = parts[1]; // Use COPY (SELECT ... LIMIT N) TO STDOUT piped to COPY ... FROM STDIN let copy_out_sql = format!( "\\copy (SELECT * FROM \\\"{}\\\".\\\"{}\\\" LIMIT {}) TO STDOUT", schema, table, sample_rows ); let copy_in_sql = format!( "\\copy \\\"{}\\\".\\\"{}\\\" FROM STDIN", schema, table ); let escaped_url = shell_escape(source_url); let escaped_container = shell_escape(container_name); let escaped_db = shell_escape(database); let host_flag = docker_host_flag(docker_host); let source_cmd = if has_local { format!("psql '{}' -c \"{}\"", escaped_url, copy_out_sql) } else { let image = format!("postgres:{}", pg_version); format!( "docker {} run --rm --network=host {} psql '{}' -c \"{}\"", host_flag, image, escaped_url, copy_out_sql ) }; let pipe_cmd = format!( "set -o pipefail; {} | docker {} exec -i '{}' psql -U postgres -d '{}' -c \"{}\"", source_cmd, host_flag, escaped_container, escaped_db, copy_in_sql ); let output = Command::new("bash") .args(["-c", &pipe_cmd]) .output() .await; match output { Ok(out) => { let stderr = String::from_utf8_lossy(&out.stderr).trim().to_string(); if !stderr.is_empty() && (stderr.contains("ERROR") || stderr.contains("FATAL")) { emit_progress( app, clone_id, "transfer", pct, &format!("Warning: {}", qualified_table), Some(&stderr), ); } } Err(e) => { emit_progress( app, clone_id, "transfer", pct, &format!("Warning: failed to copy {}: {}", qualified_table, e), None, ); } } } emit_progress(app, clone_id, "transfer", 85, "Sample data transfer completed", None); Ok(()) } fn save_connection_config(app: &AppHandle, config: &ConnectionConfig) -> TuskResult<()> { let path = get_connections_path(app)?; let mut connections = if path.exists() { let data = fs::read_to_string(&path)?; serde_json::from_str::>(&data)? } else { vec![] }; connections.push(config.clone()); let data = serde_json::to_string_pretty(&connections)?; fs::write(&path, data)?; Ok(()) } #[tauri::command] pub async fn start_container(state: State<'_, Arc>, name: String) -> TuskResult<()> { let output = docker_cmd(&state) .await .args(["start", &name]) .output() .await .map_err(|e| docker_err(format!("Failed to start container: {}", e)))?; if !output.status.success() { let stderr = String::from_utf8_lossy(&output.stderr); return Err(docker_err(format!("Failed to start container: {}", stderr))); } Ok(()) } #[tauri::command] pub async fn stop_container(state: State<'_, Arc>, name: String) -> TuskResult<()> { let output = docker_cmd(&state) .await .args(["stop", &name]) .output() .await .map_err(|e| docker_err(format!("Failed to stop container: {}", e)))?; if !output.status.success() { let stderr = String::from_utf8_lossy(&output.stderr); return Err(docker_err(format!("Failed to stop container: {}", stderr))); } Ok(()) } #[tauri::command] pub async fn remove_container(state: State<'_, Arc>, name: String) -> TuskResult<()> { let output = docker_cmd(&state) .await .args(["rm", "-f", &name]) .output() .await .map_err(|e| docker_err(format!("Failed to remove container: {}", e)))?; if !output.status.success() { let stderr = String::from_utf8_lossy(&output.stderr); return Err(docker_err(format!("Failed to remove container: {}", stderr))); } Ok(()) }