import os import sqlite3 import shutil import tempfile import logging import hashlib from urllib.parse import quote from fastapi import FastAPI, Request, File, UploadFile, Form, HTTPException from fastapi.responses import HTMLResponse, RedirectResponse, FileResponse from fastapi.templating import Jinja2Templates from fastapi.staticfiles import StaticFiles # Setup logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger("overlay-manager") app = FastAPI(root_path="/patch-manager") os.makedirs("static", exist_ok=True) app.mount("/static", StaticFiles(directory="static"), name="static") templates = Jinja2Templates(directory="templates") OVERLAY_DIR = os.path.abspath("/srv") # Use the new mounted directory for the database DB_DIR = "/app/db" os.makedirs(DB_DIR, exist_ok=True) DB_PATH = os.path.join(DB_DIR, "overlays.db") def init_db(): """Initializes the SQLite database and schema.""" with sqlite3.connect(DB_PATH) as conn: conn.execute(""" CREATE TABLE IF NOT EXISTS profiles ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT UNIQUE NOT NULL, is_active BOOLEAN NOT NULL DEFAULT 0 ) """) conn.execute(""" CREATE TABLE IF NOT EXISTS files ( id INTEGER PRIMARY KEY AUTOINCREMENT, profile_id INTEGER NOT NULL, filepath TEXT NOT NULL, content BLOB NOT NULL, FOREIGN KEY(profile_id) REFERENCES profiles(id) ON DELETE CASCADE, UNIQUE(profile_id, filepath) ) """) # Auto-migrate: Add description, is_readonly, and password columns if they don't exist cursor = conn.cursor() cursor.execute("PRAGMA table_info(profiles)") columns = [info[1] for info in cursor.fetchall()] if "description" not in columns: cursor.execute("ALTER TABLE profiles ADD COLUMN description TEXT DEFAULT ''") if "is_readonly" not in columns: cursor.execute("ALTER TABLE profiles ADD COLUMN is_readonly BOOLEAN NOT NULL DEFAULT 0") if "password" not in columns: cursor.execute("ALTER TABLE profiles ADD COLUMN password TEXT DEFAULT NULL") # Create default profiles if the DB is empty cursor.execute("SELECT count(*) FROM profiles") if cursor.fetchone()[0] == 0: cursor.execute("INSERT INTO profiles (name, description, is_active, is_readonly) VALUES ('Vanilla (No Overlays)', 'Baseline ArduPilot build without any modifications. Read-only.', 1, 1)") cursor.execute("INSERT INTO profiles (name, description, is_active, is_readonly) VALUES ('Custom Build Alpha', 'Experimental tuning parameters.', 0, 0)") else: # Lock the existing Vanilla profile for users upgrading cursor.execute("UPDATE profiles SET is_readonly = 1 WHERE name = 'Vanilla (No Overlays)'") conn.commit() init_db() def get_db_connection(): conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row return conn def hash_password(password: str) -> str: return hashlib.sha256(password.encode('utf-8')).hexdigest() def is_compiling() -> bool: """ Dynamically checks if a build is happening by scanning active processes. Includes a timeout check to prevent infinite lockups if the builder hangs. """ try: # Match the timeout set in docker-compose (default 900 seconds / 15 mins) timeout_sec = int(os.environ.get("CBS_BUILD_TIMEOUT_SEC", 900)) # Get the overall system uptime to calculate process age with open('/proc/uptime', 'r') as f: sys_uptime = float(f.read().split()[0]) ticks_per_sec = os.sysconf(os.sysconf_names['SC_CLK_TCK']) for pid in os.listdir('/proc'): if pid.isdigit(): try: with open(f'/proc/{pid}/cmdline', 'r') as f: cmdline = f.read().replace('\x00', ' ') # 'waf' is the build system for ArduPilot if 'waf' in cmdline: # Read the process stats to find out when it started with open(f'/proc/{pid}/stat', 'r') as f: stat_fields = f.read().split() # Field 22 (index 21) is starttime in clock ticks starttime_ticks = int(stat_fields[21]) starttime_sec = starttime_ticks / ticks_per_sec process_uptime = sys_uptime - starttime_sec # If the waf process has been running longer than the timeout, it's hung if process_uptime > timeout_sec: logger.warning(f"Ignoring stale 'waf' process (PID {pid}): running for {process_uptime:.1f}s (exceeds {timeout_sec}s timeout)") continue return True except (IOError, FileNotFoundError, IndexError, ValueError): continue except Exception as e: logger.debug(f"Process scan failed/unavailable: {e}") return False @app.get("/", response_class=HTMLResponse) async def index(request: Request, success_msg: str = None, error_msg: str = None): conn = get_db_connection() profiles_raw = conn.execute("SELECT * FROM profiles ORDER BY name").fetchall() profiles = [] for p in profiles_raw: p_dict = dict(p) p_dict['is_locked'] = bool(p_dict.get('password')) profiles.append(p_dict) active_profile_raw = conn.execute("SELECT * FROM profiles WHERE is_active = 1").fetchone() active_profile = dict(active_profile_raw) if active_profile_raw else None if active_profile: active_profile['is_locked'] = bool(active_profile.get('password')) files = [] dirs = set() if active_profile: files = [dict(f) for f in conn.execute("SELECT * FROM files WHERE profile_id = ? ORDER BY filepath", (active_profile['id'],)).fetchall()] for f in files: dirname = os.path.dirname(f['filepath']) if dirname: dirs.add(dirname) conn.close() return templates.TemplateResponse("index.html", { "request": request, "profiles": profiles, "active_profile": active_profile, "files": files, "dirs": sorted(list(dirs)), "success_msg": success_msg, "error_msg": error_msg, "is_compiling": is_compiling() }) @app.post("/profile/create") async def create_profile(name: str = Form(...)): try: with get_db_connection() as conn: conn.execute("INSERT INTO profiles (name, description, is_active, is_readonly) VALUES (?, '', 0, 0)", (name.strip(),)) conn.commit() msg = quote(f"Profile '{name}' created.") return RedirectResponse(url=f"/patch-manager/?success_msg={msg}", status_code=303) except sqlite3.IntegrityError: msg = quote("A profile with that name already exists.") return RedirectResponse(url=f"/patch-manager/?error_msg={msg}", status_code=303) except Exception as e: logger.error(f"Profile creation failed: {e}") return RedirectResponse(url=f"/patch-manager/?error_msg=Failed to create profile.", status_code=303) @app.post("/profile/rename") async def rename_profile(profile_id: int = Form(...), new_name: str = Form(...)): try: with get_db_connection() as conn: prof = conn.execute("SELECT is_readonly, password FROM profiles WHERE id = ?", (profile_id,)).fetchone() if prof: if prof['is_readonly']: return RedirectResponse(url=f"/patch-manager/?error_msg=Cannot rename a read-only profile.", status_code=303) if prof['password']: return RedirectResponse(url=f"/patch-manager/?error_msg=Cannot rename a locked profile. Unlock it first.", status_code=303) conn.execute("UPDATE profiles SET name = ? WHERE id = ?", (new_name.strip(), profile_id)) conn.commit() msg = quote(f"Profile renamed to '{new_name}'.") return RedirectResponse(url=f"/patch-manager/?success_msg={msg}", status_code=303) except sqlite3.IntegrityError: msg = quote("A profile with that name already exists.") return RedirectResponse(url=f"/patch-manager/?error_msg={msg}", status_code=303) except Exception as e: logger.error(f"Profile rename failed: {e}") return RedirectResponse(url=f"/patch-manager/?error_msg=Failed to rename profile.", status_code=303) @app.post("/profile/annotate") async def annotate_profile(profile_id: int = Form(...), description: str = Form("")): try: with get_db_connection() as conn: prof = conn.execute("SELECT is_readonly, password FROM profiles WHERE id = ?", (profile_id,)).fetchone() if prof: if prof['is_readonly']: return RedirectResponse(url=f"/patch-manager/?error_msg=Cannot edit notes on a read-only profile.", status_code=303) if prof['password']: return RedirectResponse(url=f"/patch-manager/?error_msg=Cannot edit notes on a locked profile. Unlock it first.", status_code=303) conn.execute("UPDATE profiles SET description = ? WHERE id = ?", (description, profile_id)) conn.commit() msg = quote("Notes saved successfully.") return RedirectResponse(url=f"/patch-manager/?success_msg={msg}", status_code=303) except Exception as e: logger.error(f"Annotation failed: {e}") return RedirectResponse(url=f"/patch-manager/?error_msg=Failed to save notes.", status_code=303) @app.post("/profile/duplicate") async def duplicate_profile(profile_id: int = Form(...)): try: with get_db_connection() as conn: cursor = conn.cursor() src = cursor.execute("SELECT name, description FROM profiles WHERE id = ?", (profile_id,)).fetchone() if not src: raise Exception("Source profile not found.") base_name = src['name'] desc = src['description'] if src['description'] else "" new_name = f"{base_name} (Copy)" counter = 1 while True: exists = cursor.execute("SELECT id FROM profiles WHERE name = ?", (new_name,)).fetchone() if not exists: break counter += 1 new_name = f"{base_name} (Copy {counter})" cursor.execute("UPDATE profiles SET is_active = 0") # Duplicates are NEVER read-only, and NEVER locked by default cursor.execute("INSERT INTO profiles (name, description, is_active, is_readonly, password) VALUES (?, ?, 1, 0, NULL)", (new_name, desc)) new_profile_id = cursor.lastrowid files = cursor.execute("SELECT filepath, content FROM files WHERE profile_id = ?", (profile_id,)).fetchall() for f in files: cursor.execute("INSERT INTO files (profile_id, filepath, content) VALUES (?, ?, ?)", (new_profile_id, f['filepath'], f['content'])) conn.commit() msg = quote(f"Seamlessly cloned and activated '{new_name}'.") return RedirectResponse(url=f"/patch-manager/?success_msg={msg}", status_code=303) except Exception as e: logger.error(f"Profile duplication failed: {e}") return RedirectResponse(url=f"/patch-manager/?error_msg=Failed to duplicate profile.", status_code=303) @app.post("/profile/activate") async def activate_profile(profile_id: int = Form(...)): try: with get_db_connection() as conn: conn.execute("UPDATE profiles SET is_active = 0") conn.execute("UPDATE profiles SET is_active = 1 WHERE id = ?", (profile_id,)) conn.commit() return RedirectResponse(url=f"/patch-manager/", status_code=303) except Exception as e: logger.error(f"Profile activation failed: {e}") return RedirectResponse(url=f"/patch-manager/?error_msg=Failed to activate profile.", status_code=303) @app.post("/profile/delete") async def delete_profile(profile_id: int = Form(...), password: str = Form(default="")): try: with get_db_connection() as conn: prof = conn.execute("SELECT is_readonly, is_active, password FROM profiles WHERE id = ?", (profile_id,)).fetchone() if prof: if prof['is_readonly']: msg = quote("Cannot delete a read-only profile.") return RedirectResponse(url=f"/patch-manager/?error_msg={msg}", status_code=303) if prof['is_active']: msg = quote("Cannot delete the currently active profile. Switch to another first.") return RedirectResponse(url=f"/patch-manager/?error_msg={msg}", status_code=303) if prof['password']: if not password.strip(): msg = quote("Cannot delete a locked profile without a password.") return RedirectResponse(url=f"/patch-manager/?error_msg={msg}", status_code=303) hashed_pw = hash_password(password.strip()) if prof['password'] != hashed_pw: msg = quote("Incorrect password. Profile not deleted.") return RedirectResponse(url=f"/patch-manager/?error_msg={msg}", status_code=303) conn.execute("DELETE FROM profiles WHERE id = ?", (profile_id,)) conn.commit() msg = quote("Profile deleted successfully.") return RedirectResponse(url=f"/patch-manager/?success_msg={msg}", status_code=303) except Exception as e: logger.error(f"Profile deletion failed: {e}") return RedirectResponse(url=f"/patch-manager/?error_msg=Failed to delete profile.", status_code=303) @app.post("/profile/lock") async def lock_profile(profile_id: int = Form(...), password: str = Form(...)): try: if not password.strip(): return RedirectResponse(url=f"/patch-manager/?error_msg=Password cannot be empty.", status_code=303) hashed_pw = hash_password(password.strip()) with get_db_connection() as conn: prof = conn.execute("SELECT is_readonly FROM profiles WHERE id = ?", (profile_id,)).fetchone() if prof and prof['is_readonly']: return RedirectResponse(url=f"/patch-manager/?error_msg=Cannot lock a system read-only profile.", status_code=303) conn.execute("UPDATE profiles SET password = ? WHERE id = ?", (hashed_pw, profile_id)) conn.commit() msg = quote("Profile locked successfully.") return RedirectResponse(url=f"/patch-manager/?success_msg={msg}", status_code=303) except Exception as e: logger.error(f"Profile lock failed: {e}") return RedirectResponse(url=f"/patch-manager/?error_msg=Failed to lock profile.", status_code=303) @app.post("/profile/unlock") async def unlock_profile(profile_id: int = Form(...), password: str = Form(...)): try: hashed_pw = hash_password(password.strip()) with get_db_connection() as conn: prof = conn.execute("SELECT password FROM profiles WHERE id = ?", (profile_id,)).fetchone() if not prof or prof['password'] != hashed_pw: return RedirectResponse(url=f"/patch-manager/?error_msg=Incorrect password.", status_code=303) conn.execute("UPDATE profiles SET password = NULL WHERE id = ?", (profile_id,)) conn.commit() msg = quote("Profile unlocked successfully.") return RedirectResponse(url=f"/patch-manager/?success_msg={msg}", status_code=303) except Exception as e: logger.error(f"Profile unlock failed: {e}") return RedirectResponse(url=f"/patch-manager/?error_msg=Failed to unlock profile.", status_code=303) @app.post("/deploy") async def deploy_active_profile(): # Double-check right before deploying to prevent race conditions if is_compiling(): msg = quote("Deployment blocked: A build is currently in progress.") return RedirectResponse(url=f"/patch-manager/?error_msg={msg}", status_code=303) try: with get_db_connection() as conn: active = conn.execute("SELECT id, name FROM profiles WHERE is_active = 1").fetchone() if not active: raise Exception("No active profile selected.") files = conn.execute("SELECT filepath, content FROM files WHERE profile_id = ?", (active['id'],)).fetchall() if os.path.exists(OVERLAY_DIR): for item in os.listdir(OVERLAY_DIR): item_path = os.path.join(OVERLAY_DIR, item) if os.path.isfile(item_path): os.remove(item_path) elif os.path.isdir(item_path): shutil.rmtree(item_path) os.makedirs(OVERLAY_DIR, exist_ok=True) count = 0 for f in files: target = os.path.abspath(os.path.join(OVERLAY_DIR, f['filepath'].lstrip("/"))) if target.startswith(OVERLAY_DIR): os.makedirs(os.path.dirname(target), exist_ok=True) with open(target, "wb") as out_file: out_file.write(f['content']) count += 1 return RedirectResponse(url="/", status_code=303) except Exception as e: logger.error(f"Deployment failed: {e}") msg = quote("Failed to deploy files to staging directory.") return RedirectResponse(url=f"/patch-manager/?error_msg={msg}", status_code=303) @app.post("/file/upload") async def upload_file( file: UploadFile = File(...), existing_path: str = Form(""), new_path: str = Form(""), profile_id: int = Form(...) ): try: with get_db_connection() as conn: prof = conn.execute("SELECT is_readonly, password FROM profiles WHERE id = ?", (profile_id,)).fetchone() if prof: if prof['is_readonly']: msg = quote("Cannot upload files to a read-only profile.") return RedirectResponse(url=f"/patch-manager/?error_msg={msg}", status_code=303) if prof['password']: msg = quote("Cannot upload files to a locked profile.") return RedirectResponse(url=f"/patch-manager/?error_msg={msg}", status_code=303) target_path = new_path.strip() if new_path.strip() else existing_path.strip() clean_path = target_path.strip("/") safe_filename = os.path.basename(file.filename) full_filepath = os.path.join(clean_path, safe_filename) if clean_path else safe_filename content = await file.read() conn.execute(""" INSERT INTO files (profile_id, filepath, content) VALUES (?, ?, ?) ON CONFLICT(profile_id, filepath) DO UPDATE SET content=excluded.content """, (profile_id, full_filepath, content)) conn.commit() msg = quote(f"Successfully saved {safe_filename} to profile.") return RedirectResponse(url=f"/patch-manager/?success_msg={msg}", status_code=303) except Exception as e: logger.error(f"Upload failed: {e}") msg = quote("Failed to upload file to database.") return RedirectResponse(url=f"/patch-manager/?error_msg={msg}", status_code=303) @app.post("/file/delete") async def delete_file(file_id: int = Form(...)): try: with get_db_connection() as conn: file_record = conn.execute("SELECT profile_id FROM files WHERE id = ?", (file_id,)).fetchone() if file_record: prof = conn.execute("SELECT is_readonly, password FROM profiles WHERE id = ?", (file_record['profile_id'],)).fetchone() if prof and (prof['is_readonly'] or prof['password']): msg = quote("Cannot delete file from a locked or read-only profile.") return RedirectResponse(url=f"/patch-manager/?error_msg={msg}", status_code=303) conn.execute("DELETE FROM files WHERE id = ?", (file_id,)) conn.commit() return RedirectResponse(url=f"/patch-manager/", status_code=303) except Exception as e: logger.error(f"File deletion failed: {e}") return RedirectResponse(url=f"/patch-manager/?error_msg=Failed to delete file.", status_code=303) @app.get("/edit", response_class=HTMLResponse) async def edit_file(request: Request, file_id: int, error_msg: str = None): try: with get_db_connection() as conn: file_record = conn.execute("SELECT * FROM files WHERE id = ?", (file_id,)).fetchone() active_profile_raw = conn.execute("SELECT * FROM profiles WHERE is_active = 1").fetchone() active_profile = dict(active_profile_raw) if active_profile_raw else None if active_profile: active_profile['is_locked'] = bool(active_profile.get('password')) profiles_raw = conn.execute("SELECT * FROM profiles ORDER BY name").fetchall() profiles = [] for p in profiles_raw: p_dict = dict(p) p_dict['is_locked'] = bool(p_dict.get('password')) profiles.append(p_dict) if not file_record: msg = quote("File not found in database.") return RedirectResponse(url=f"/patch-manager/?error_msg={msg}", status_code=303) try: content_text = file_record['content'].decode('utf-8') except UnicodeDecodeError: msg = quote(f"Safety Lock: {os.path.basename(file_record['filepath'])} appears to be a binary file and cannot be opened in the text editor.") return RedirectResponse(url=f"/patch-manager/?error_msg={msg}", status_code=303) return templates.TemplateResponse("edit.html", { "request": request, "file_id": file_id, "filepath": file_record['filepath'], "content": content_text, "active_profile": active_profile, "profiles": profiles, "error_msg": error_msg }) except Exception as e: logger.error(f"Edit GET failed: {e}") msg = quote(f"System Error loading file: {str(e)}") return RedirectResponse(url=f"/patch-manager/?error_msg={msg}", status_code=303) @app.post("/edit") async def save_file(file_id: int = Form(...), content: str = Form(...)): try: content_bytes = content.encode('utf-8') with get_db_connection() as conn: file_record = conn.execute("SELECT profile_id FROM files WHERE id = ?", (file_id,)).fetchone() if file_record: prof = conn.execute("SELECT is_readonly, password FROM profiles WHERE id = ?", (file_record['profile_id'],)).fetchone() if prof and (prof['is_readonly'] or prof['password']): msg = quote("Cannot modify files in a locked or read-only profile.") return RedirectResponse(url=f"/patch-manager/edit?file_id={file_id}&error_msg={msg}", status_code=303) conn.execute("UPDATE files SET content = ? WHERE id = ?", (content_bytes, file_id)) conn.commit() msg = quote("Successfully saved changes.") return RedirectResponse(url=f"/patch-manager/?success_msg={msg}", status_code=303) except Exception as e: logger.error(f"Edit POST failed: {e}") msg = quote("Failed to save file.") return RedirectResponse(url=f"/patch-manager/edit?file_id={file_id}&error_msg={msg}", status_code=303) @app.get("/download") async def download_single_file(file_id: int): try: with get_db_connection() as conn: file_record = conn.execute("SELECT * FROM files WHERE id = ?", (file_id,)).fetchone() if not file_record: raise HTTPException(status_code=404, detail="File not found") with tempfile.NamedTemporaryFile(delete=False) as tmp: tmp.write(file_record['content']) tmp_path = tmp.name return FileResponse( path=tmp_path, filename=os.path.basename(file_record['filepath']), media_type='application/octet-stream' ) except Exception as e: logger.error(f"Download single file failed: {e}") msg = quote("Failed to download file.") return RedirectResponse(url=f"/patch-manager/?error_msg={msg}", status_code=303)