import io import json import os import shutil import subprocess import threading import time import uuid import zipfile from pathlib import Path import db as database from utils import rename_from_metadata, sanitize_filename, cleanup_empty_dirs from flask import ( Flask, jsonify, redirect, render_template, request, send_from_directory, session, url_for, ) app = Flask(__name__) app.secret_key = os.environ.get("SECRET_KEY", os.urandom(32).hex()) # Paths DOWNLOADS_DIR = Path(os.environ.get("DOWNLOADS_DIR", "/downloads")) COOKIES_PATH = Path(os.environ.get("COOKIES_PATH", "/config/cookies.txt")) CONFIG_DIR = Path(os.environ.get("CONFIG_DIR", "/config")) WVD_PATH = Path(os.environ.get("WVD_PATH", "/config/device.wvd")) TEMP_DIR = Path("/tmp/votify") DOWNLOADS_DIR.mkdir(parents=True, exist_ok=True) TEMP_DIR.mkdir(parents=True, exist_ok=True) # Database DB_PATH = CONFIG_DIR / "trackpull.db" database.init_db(DB_PATH) # In-memory job tracking jobs: dict[str, dict] = {} jobs_lock = threading.Lock() # --------------------------------------------------------------------------- # Auth helpers # --------------------------------------------------------------------------- @app.before_request def require_login(): public = {"login", "logout", "static", "service_worker", "offline"} if request.endpoint in public: return if not session.get("user_id"): if request.path.startswith("/api/"): return jsonify({"error": "Unauthorized"}), 401 return redirect(url_for("login")) def require_admin(): if session.get("role") != "admin": return jsonify({"error": "Forbidden"}), 403 # --------------------------------------------------------------------------- # Per-user download directory # --------------------------------------------------------------------------- def get_user_downloads_dir(user_id: str) -> Path: d = DOWNLOADS_DIR / user_id d.mkdir(parents=True, exist_ok=True) return d # --------------------------------------------------------------------------- # Audio helpers # --------------------------------------------------------------------------- def snapshot_audio_files(directory: Path) -> set[Path]: extensions = {".m4a", ".ogg", ".opus"} files = set() for ext in extensions: files.update(directory.rglob(f"*{ext}")) return files def convert_to_mp3(job_id: str, before: set[Path], user_dir: Path): after = snapshot_audio_files(user_dir) new_files = after - before if not new_files: return def log(msg): with jobs_lock: jobs[job_id]["output"] = jobs[job_id].get("output", [])[-500:] + [msg] log(f"[mp3] Converting {len(new_files)} file(s) to MP3...") for src in sorted(new_files): dst = src.with_suffix(".mp3") log(f"[mp3] Converting: {src.name}") result = subprocess.run( ["ffmpeg", "-y", "-i", str(src), "-codec:v", "copy", "-q:a", "2", str(dst)], capture_output=True, text=True, ) if result.returncode == 0: src.unlink() log(f"[mp3] Done: {dst.name}") else: log(f"[mp3] Failed: {src.name} - {result.stderr.strip()[-200:]}") log("[mp3] Conversion complete.") AUDIO_EXTS = {".m4a", ".ogg", ".opus", ".mp3", ".flac"} def post_process_votify_files(target_dir, job_id, user_dir: Path): """Flatten subdirs, rename files from metadata, wrap single tracks.""" target_dir = Path(target_dir) def log(msg): with jobs_lock: jobs[job_id]["output"] = jobs[job_id].get("output", [])[-500:] + [msg] # 1. Move any files in subdirectories up to target_dir for ext in AUDIO_EXTS: for f in list(target_dir.rglob(f"*{ext}")): if f.parent != target_dir: dest = target_dir / f.name counter = 2 while dest.exists(): dest = target_dir / f"{f.stem} ({counter}){f.suffix}" counter += 1 shutil.move(str(f), str(dest)) # 2. Rename from metadata renamed = [] for ext in AUDIO_EXTS: for f in list(target_dir.glob(f"*{ext}")): new_path = rename_from_metadata(f) renamed.append(new_path) if new_path != f: log(f"[post] Renamed: {new_path.name}") # 3. Clean up empty subdirs cleanup_empty_dirs(target_dir) # 4. Single-track wrapping (only if downloading to user's root dir) if len(renamed) == 1 and target_dir == user_dir: f = renamed[0] folder_name = sanitize_filename(f.stem) wrapper = target_dir / folder_name wrapper.mkdir(exist_ok=True) shutil.move(str(f), str(wrapper / f.name)) log(f"[post] Wrapped single track in folder: {folder_name}") return renamed # --------------------------------------------------------------------------- # Download runners # --------------------------------------------------------------------------- def run_download(job_id: str, urls: list[str], options: dict, output_path: str = None, user_id: str = None): user_dir = get_user_downloads_dir(user_id) if user_id else DOWNLOADS_DIR effective_output = output_path or str(user_dir) cmd = ["votify"] cmd.extend(["--cookies-path", str(COOKIES_PATH)]) cmd.extend(["--output-path", effective_output]) cmd.extend(["--temp-path", str(TEMP_DIR)]) if WVD_PATH.exists(): cmd.extend(["--wvd-path", str(WVD_PATH)]) cmd.extend(["--template-folder-album", "."]) cmd.extend(["--template-folder-compilation", "."]) cmd.extend(["--template-folder-episode", "."]) cmd.extend(["--template-folder-music-video", "."]) quality = options.get("audio_quality", "aac-medium") if quality: cmd.extend(["--audio-quality", quality]) download_mode = options.get("download_mode", "ytdlp") if download_mode: cmd.extend(["--download-mode", download_mode]) video_format = options.get("video_format", "mp4") if video_format: cmd.extend(["--video-format", video_format]) cover_size = options.get("cover_size", "large") if cover_size: cmd.extend(["--cover-size", cover_size]) if options.get("save_cover"): cmd.append("--save-cover") if options.get("save_playlist"): cmd.append("--save-playlist") if options.get("overwrite"): cmd.append("--overwrite") if options.get("download_music_videos"): cmd.append("--download-music-videos") if options.get("save_lrc"): cmd.append("--lrc-only") if options.get("no_lrc"): cmd.append("--no-lrc") truncate = options.get("truncate") if truncate: cmd.extend(["--truncate", str(truncate)]) cmd.extend(urls) want_mp3 = options.get("output_format") == "mp3" files_before = snapshot_audio_files(user_dir) if want_mp3 else None with jobs_lock: jobs[job_id]["status"] = "running" jobs[job_id]["command"] = " ".join(cmd) try: process = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, bufsize=1, ) with jobs_lock: jobs[job_id]["process"] = process output_lines = [] for line in process.stdout: with jobs_lock: if jobs[job_id]["status"] == "cancelled": break line = line.rstrip("\n") output_lines.append(line) with jobs_lock: jobs[job_id]["output"] = output_lines[-500:] process.wait() with jobs_lock: cancelled = jobs[job_id]["status"] == "cancelled" if cancelled: with jobs_lock: jobs[job_id]["output"] = jobs[job_id].get("output", []) + ["[cancelled] Job was cancelled by user."] else: if process.returncode == 0: target = Path(effective_output) post_process_votify_files(target, job_id, user_dir) if want_mp3: convert_to_mp3(job_id, files_before, user_dir) with jobs_lock: jobs[job_id]["status"] = "completed" if process.returncode == 0 else "failed" jobs[job_id]["return_code"] = process.returncode except Exception as e: with jobs_lock: jobs[job_id]["status"] = "failed" jobs[job_id]["output"] = jobs[job_id].get("output", []) + [str(e)] with jobs_lock: database.upsert_job(jobs[job_id]) def run_monochrome_download(job_id: str, url: str, quality: str, user_id: str): user_dir = get_user_downloads_dir(user_id) with jobs_lock: jobs[job_id]["status"] = "running" def log(msg): with jobs_lock: jobs[job_id]["output"] = jobs[job_id].get("output", [])[-500:] + [msg] def is_cancelled(): with jobs_lock: return jobs[job_id]["status"] == "cancelled" try: from monochrome.api import download_spotify_url success, total, fail_info = download_spotify_url( spotify_url=url, quality=quality, output_dir=str(user_dir), log=log, cancel_check=is_cancelled, ) with jobs_lock: if jobs[job_id]["status"] != "cancelled": jobs[job_id]["status"] = "completed" if success > 0 else "failed" jobs[job_id]["return_code"] = 0 if success > 0 else 1 except Exception as e: with jobs_lock: jobs[job_id]["status"] = "failed" jobs[job_id]["output"] = jobs[job_id].get("output", []) + [f"[error] {e}"] jobs[job_id]["return_code"] = 1 with jobs_lock: database.upsert_job(jobs[job_id]) def run_unified_download(job_id: str, url: str, user_id: str): user_dir = get_user_downloads_dir(user_id) with jobs_lock: jobs[job_id]["status"] = "running" def log(msg): with jobs_lock: jobs[job_id]["output"] = jobs[job_id].get("output", [])[-500:] + [msg] def is_cancelled(): with jobs_lock: return jobs[job_id]["status"] == "cancelled" try: from monochrome.api import download_spotify_url success, total, fail_info = download_spotify_url( spotify_url=url, quality="MP3_320", output_dir=str(user_dir), log=log, cancel_check=is_cancelled, ) with jobs_lock: if jobs[job_id]["status"] != "cancelled": jobs[job_id]["status"] = "completed" if success > 0 else "failed" jobs[job_id]["return_code"] = 0 if success > 0 else 1 except Exception as e: with jobs_lock: jobs[job_id]["status"] = "failed" jobs[job_id]["output"] = jobs[job_id].get("output", []) + [f"[error] {e}"] jobs[job_id]["return_code"] = 1 with jobs_lock: database.upsert_job(jobs[job_id]) return with jobs_lock: cancelled = jobs[job_id]["status"] == "cancelled" failed_urls = fail_info.get("failed_urls", []) if cancelled or not failed_urls: with jobs_lock: database.upsert_job(jobs[job_id]) return # Spawn Votify fallback job for failed tracks fallback_quality = database.get_setting("fallback_quality", "aac-medium") subfolder = fail_info.get("subfolder") output_path = str(user_dir / subfolder) if subfolder else str(user_dir) votify_job_id = str(uuid.uuid4())[:8] with jobs_lock: jobs[votify_job_id] = { "id": votify_job_id, "user_id": user_id, "urls": failed_urls, "options": {"audio_quality": fallback_quality, "source": "votify-fallback"}, "status": "queued", "output": [], "created_at": time.time(), } log(f"[monochrome] {len(failed_urls)} track(s) failed — starting Votify fallback (job {votify_job_id})") with jobs_lock: database.upsert_job(jobs[job_id]) run_download(votify_job_id, failed_urls, { "audio_quality": fallback_quality, "output_format": "mp3", }, output_path=output_path, user_id=user_id) # --------------------------------------------------------------------------- # Background purge thread # --------------------------------------------------------------------------- def _purge_loop(): while True: time.sleep(3600) try: days = int(database.get_setting("job_expiry_days", "30")) if days <= 0: continue cutoff = time.time() - days * 86400 database.delete_jobs_older_than(cutoff) if not DOWNLOADS_DIR.exists(): continue for user_dir in DOWNLOADS_DIR.iterdir(): if not user_dir.is_dir(): continue files = [f for f in user_dir.rglob("*") if f.is_file()] if not files or max(f.stat().st_mtime for f in files) < cutoff: shutil.rmtree(user_dir, ignore_errors=True) except Exception as e: print(f"[purge] {e}") threading.Thread(target=_purge_loop, daemon=True).start() # --------------------------------------------------------------------------- # Static / offline routes # --------------------------------------------------------------------------- @app.route("/sw.js") def service_worker(): return send_from_directory("static", "sw.js", mimetype="application/javascript") @app.route("/offline") def offline(): return send_from_directory("static", "offline.html") # --------------------------------------------------------------------------- # Auth routes # --------------------------------------------------------------------------- @app.route("/login", methods=["GET", "POST"]) def login(): if request.method == "POST": username = request.form.get("username", "").strip() password = request.form.get("password", "") user = database.get_user_by_username(username) if user and database.verify_password(user, password): session.clear() session["user_id"] = user["id"] session["username"] = user["username"] session["role"] = user["role"] database.update_last_login(user["id"]) return redirect(url_for("index")) return render_template("login.html", error="Invalid username or password") return render_template("login.html", error=None) @app.route("/logout") def logout(): session.clear() return redirect(url_for("login")) @app.route("/") def index(): return render_template( "index.html", username=session.get("username"), role=session.get("role"), ) # --------------------------------------------------------------------------- # Account routes # --------------------------------------------------------------------------- @app.route("/api/account/password", methods=["POST"]) def change_password(): data = request.json user = database.get_user_by_id(session["user_id"]) if not database.verify_password(user, data.get("current_password", "")): return jsonify({"error": "Current password incorrect"}), 400 new_pw = data.get("new_password", "") if not new_pw: return jsonify({"error": "New password cannot be empty"}), 400 database.update_user_password(session["user_id"], new_pw) return jsonify({"ok": True}) # --------------------------------------------------------------------------- # Download routes # --------------------------------------------------------------------------- @app.route("/api/download", methods=["POST"]) def start_download(): data = request.json urls = [u.strip() for u in data.get("urls", "").split("\n") if u.strip()] if not urls: return jsonify({"error": "No URLs provided"}), 400 if not COOKIES_PATH.exists(): return jsonify({"error": "cookies.txt not found. Mount it to /config/cookies.txt"}), 400 options = { "audio_quality": data.get("audio_quality", "aac-medium"), "download_mode": data.get("download_mode", "ytdlp"), "video_format": data.get("video_format", "mp4"), "cover_size": data.get("cover_size", "large"), "save_cover": data.get("save_cover", False), "save_playlist": data.get("save_playlist", False), "overwrite": data.get("overwrite", False), "download_music_videos": data.get("download_music_videos", False), "save_lrc": data.get("save_lrc", False), "no_lrc": data.get("no_lrc", False), "truncate": data.get("truncate"), "output_format": data.get("output_format", "original"), } user_id = session["user_id"] job_id = str(uuid.uuid4())[:8] with jobs_lock: jobs[job_id] = { "id": job_id, "user_id": user_id, "urls": urls, "options": options, "status": "queued", "output": [], "created_at": time.time(), } thread = threading.Thread( target=run_download, args=(job_id, urls, options, None, user_id), daemon=True ) thread.start() return jsonify({"job_id": job_id}) @app.route("/api/monochrome/download", methods=["POST"]) def start_monochrome_download(): data = request.json url = data.get("url", "").strip() quality = data.get("quality", "HI_RES_LOSSLESS") if not url: return jsonify({"error": "No URL provided"}), 400 valid_qualities = ["HI_RES_LOSSLESS", "LOSSLESS", "HIGH", "LOW", "MP3_320"] if quality not in valid_qualities: return jsonify({"error": f"Invalid quality. Choose from: {valid_qualities}"}), 400 user_id = session["user_id"] job_id = str(uuid.uuid4())[:8] with jobs_lock: jobs[job_id] = { "id": job_id, "user_id": user_id, "urls": [url], "options": {"quality": quality, "source": "monochrome"}, "status": "queued", "output": [], "created_at": time.time(), } thread = threading.Thread( target=run_monochrome_download, args=(job_id, url, quality, user_id), daemon=True ) thread.start() return jsonify({"job_id": job_id}) @app.route("/api/unified/download", methods=["POST"]) def start_unified_download(): data = request.json url = data.get("url", "").strip() if not url: return jsonify({"error": "No URL provided"}), 400 user_id = session["user_id"] job_id = str(uuid.uuid4())[:8] with jobs_lock: jobs[job_id] = { "id": job_id, "user_id": user_id, "urls": [url], "options": {"source": "unified"}, "status": "queued", "output": [], "created_at": time.time(), } thread = threading.Thread( target=run_unified_download, args=(job_id, url, user_id), daemon=True ) thread.start() return jsonify({"job_id": job_id}) # --------------------------------------------------------------------------- # Settings routes # --------------------------------------------------------------------------- @app.route("/api/settings", methods=["GET"]) def get_settings(): return jsonify({ "fallback_quality": database.get_setting("fallback_quality", "aac-medium"), "job_expiry_days": int(database.get_setting("job_expiry_days", "30")), }) @app.route("/api/settings", methods=["POST"]) def save_settings(): guard = require_admin() if guard: return guard data = request.json allowed_qualities = ["aac-medium", "aac-high"] quality = data.get("fallback_quality", "aac-medium") if quality not in allowed_qualities: quality = "aac-medium" database.set_setting("fallback_quality", quality) if "job_expiry_days" in data: try: days = max(0, int(data["job_expiry_days"])) except (ValueError, TypeError): days = 30 database.set_setting("job_expiry_days", str(days)) return jsonify({"ok": True}) # --------------------------------------------------------------------------- # Job routes # --------------------------------------------------------------------------- def job_to_dict(job): return {k: v for k, v in job.items() if k != "process"} @app.route("/api/jobs") def list_jobs(): user_id = session["user_id"] db_jobs = {j["id"]: j for j in database.list_jobs_for_user(user_id)} with jobs_lock: live = {jid: job_to_dict(j) for jid, j in jobs.items() if j.get("user_id") == user_id} db_jobs.update(live) return jsonify(sorted(db_jobs.values(), key=lambda j: j["created_at"], reverse=True)) @app.route("/api/jobs/") def get_job(job_id): user_id = session["user_id"] with jobs_lock: job = jobs.get(job_id) if job and job.get("user_id") == user_id: return jsonify(job_to_dict(job)) job = database.get_job(job_id) if not job or job["user_id"] != user_id: return jsonify({"error": "Job not found"}), 404 return jsonify(job) @app.route("/api/jobs//cancel", methods=["POST"]) def cancel_job(job_id): user_id = session["user_id"] with jobs_lock: job = jobs.get(job_id) if not job or job.get("user_id") != user_id: return jsonify({"error": "Job not found"}), 404 if job["status"] != "running": return jsonify({"error": "Job is not running"}), 400 job["status"] = "cancelled" proc = job.get("process") if proc: proc.terminate() return jsonify({"ok": True}) @app.route("/api/jobs/", methods=["DELETE"]) def delete_job(job_id): user_id = session["user_id"] with jobs_lock: job = jobs.get(job_id) if job and job.get("user_id") == user_id: if job["status"] == "running": return jsonify({"error": "Cancel the job first"}), 400 del jobs[job_id] database.delete_job(job_id) return jsonify({"ok": True}) # --------------------------------------------------------------------------- # File routes (scoped to current user) # --------------------------------------------------------------------------- def _resolve_user_path(user_id: str, rel_path: str): """Returns (user_dir, target) or raises ValueError on traversal.""" user_dir = get_user_downloads_dir(user_id) target = (user_dir / rel_path).resolve() if not str(target).startswith(str(user_dir.resolve())): raise ValueError("Invalid path") return user_dir, target @app.route("/api/files") def list_files(): user_id = session["user_id"] rel_path = request.args.get("path", "") try: user_dir, target = _resolve_user_path(user_id, rel_path) except ValueError: return jsonify({"error": "Invalid path"}), 403 if not target.exists(): return jsonify([]) items = [] try: for entry in sorted(target.iterdir(), key=lambda e: (not e.is_dir(), e.name.lower())): rel = entry.relative_to(user_dir) items.append({ "name": entry.name, "path": str(rel).replace("\\", "/"), "is_dir": entry.is_dir(), "size": entry.stat().st_size if entry.is_file() else None, }) except PermissionError: pass return jsonify(items) @app.route("/api/files/download") def download_file(): user_id = session["user_id"] rel_path = request.args.get("path", "") try: _, target = _resolve_user_path(user_id, rel_path) except ValueError: return jsonify({"error": "Invalid path"}), 403 if not target.is_file(): return jsonify({"error": "File not found"}), 404 return send_from_directory(target.parent, target.name, as_attachment=True) @app.route("/api/files/download-folder") def download_folder(): user_id = session["user_id"] rel_path = request.args.get("path", "") try: _, target = _resolve_user_path(user_id, rel_path) except ValueError: return jsonify({"error": "Invalid path"}), 403 if not target.is_dir(): return jsonify({"error": "Folder not found"}), 404 buf = io.BytesIO() with zipfile.ZipFile(buf, "w", zipfile.ZIP_DEFLATED) as zf: for file in target.rglob("*"): if file.is_file(): zf.write(file, file.relative_to(target)) buf.seek(0) folder_name = target.name or "downloads" return app.response_class( buf.getvalue(), mimetype="application/zip", headers={"Content-Disposition": f'attachment; filename="{folder_name}.zip"'}, ) @app.route("/api/files/delete", methods=["DELETE"]) def delete_path(): user_id = session["user_id"] rel_path = request.args.get("path", "") if not rel_path: return jsonify({"error": "Cannot delete root"}), 400 try: user_dir, target = _resolve_user_path(user_id, rel_path) except ValueError: return jsonify({"error": "Invalid path"}), 403 if not target.exists(): return jsonify({"error": "Not found"}), 404 try: if target.is_dir(): shutil.rmtree(target) else: target.unlink() except Exception as e: return jsonify({"error": str(e)}), 500 return jsonify({"ok": True}) # --------------------------------------------------------------------------- # Cookies / WVD routes # --------------------------------------------------------------------------- @app.route("/api/cookies", methods=["GET"]) def check_cookies(): guard = require_admin() if guard: return guard return jsonify({"exists": COOKIES_PATH.exists()}) @app.route("/api/cookies", methods=["POST"]) def upload_cookies(): guard = require_admin() if guard: return guard if "file" not in request.files: return jsonify({"error": "No file uploaded"}), 400 CONFIG_DIR.mkdir(parents=True, exist_ok=True) request.files["file"].save(COOKIES_PATH) return jsonify({"ok": True}) @app.route("/api/wvd", methods=["GET"]) def check_wvd(): guard = require_admin() if guard: return guard return jsonify({"exists": WVD_PATH.exists()}) @app.route("/api/wvd", methods=["POST"]) def upload_wvd(): guard = require_admin() if guard: return guard if "file" not in request.files: return jsonify({"error": "No file uploaded"}), 400 CONFIG_DIR.mkdir(parents=True, exist_ok=True) request.files["file"].save(WVD_PATH) return jsonify({"ok": True}) # --------------------------------------------------------------------------- # Admin routes # --------------------------------------------------------------------------- def _admin_resolve_path(user_id: str, rel_path: str): user_dir = get_user_downloads_dir(user_id) target = (user_dir / rel_path).resolve() if not str(target).startswith(str(user_dir.resolve())): raise ValueError("Invalid path") return user_dir, target @app.route("/api/admin/users", methods=["GET"]) def admin_list_users(): guard = require_admin() if guard: return guard return jsonify(database.list_users()) @app.route("/api/admin/users", methods=["POST"]) def admin_create_user(): guard = require_admin() if guard: return guard data = request.json username = (data.get("username") or "").strip() password = data.get("password") or "" role = data.get("role", "user") if not username or not password: return jsonify({"error": "username and password required"}), 400 if role not in ("admin", "user"): role = "user" try: user = database.create_user(username, password, role) except ValueError as e: return jsonify({"error": str(e)}), 409 return jsonify({"ok": True, "user": {k: user[k] for k in ("id", "username", "role")}}) @app.route("/api/admin/users/", methods=["DELETE"]) def admin_delete_user(user_id): guard = require_admin() if guard: return guard if user_id == session["user_id"]: return jsonify({"error": "Cannot delete your own account"}), 400 # Cancel and evict any live in-memory jobs belonging to this user with jobs_lock: for job in list(jobs.values()): if job.get("user_id") == user_id: proc = job.get("process") if proc: try: proc.terminate() except Exception: pass del jobs[job["id"]] database.delete_user(user_id) # cascades to jobs table user_dir = DOWNLOADS_DIR / user_id if user_dir.exists(): shutil.rmtree(user_dir, ignore_errors=True) return jsonify({"ok": True}) @app.route("/api/admin/users//password", methods=["POST"]) def admin_reset_user_password(user_id): guard = require_admin() if guard: return guard data = request.json new_password = (data or {}).get("new_password", "") if not new_password: return jsonify({"error": "Password cannot be empty"}), 400 if not database.get_user_by_id(user_id): return jsonify({"error": "User not found"}), 404 database.update_user_password(user_id, new_password) return jsonify({"ok": True}) @app.route("/api/admin/users//jobs", methods=["GET"]) def admin_list_user_jobs(user_id): guard = require_admin() if guard: return guard db_jobs = {j["id"]: j for j in database.list_jobs_for_user(user_id)} with jobs_lock: live = {jid: job_to_dict(j) for jid, j in jobs.items() if j.get("user_id") == user_id} db_jobs.update(live) return jsonify(sorted(db_jobs.values(), key=lambda j: j["created_at"], reverse=True)) @app.route("/api/admin/jobs/", methods=["DELETE"]) def admin_delete_job(job_id): guard = require_admin() if guard: return guard with jobs_lock: if job_id in jobs: if jobs[job_id]["status"] == "running": return jsonify({"error": "Cancel the job first"}), 400 del jobs[job_id] database.delete_job(job_id) return jsonify({"ok": True}) @app.route("/api/admin/files") def admin_list_files(): guard = require_admin() if guard: return guard user_id = request.args.get("user_id", "") rel_path = request.args.get("path", "") if not user_id: return jsonify({"error": "user_id required"}), 400 try: user_dir, target = _admin_resolve_path(user_id, rel_path) except ValueError: return jsonify({"error": "Invalid path"}), 403 if not target.exists(): return jsonify([]) items = [] try: for entry in sorted(target.iterdir(), key=lambda e: (not e.is_dir(), e.name.lower())): rel = entry.relative_to(user_dir) items.append({ "name": entry.name, "path": str(rel).replace("\\", "/"), "is_dir": entry.is_dir(), "size": entry.stat().st_size if entry.is_file() else None, }) except PermissionError: pass return jsonify(items) @app.route("/api/admin/files/download") def admin_download_file(): guard = require_admin() if guard: return guard user_id = request.args.get("user_id", "") rel_path = request.args.get("path", "") try: _, target = _admin_resolve_path(user_id, rel_path) except ValueError: return jsonify({"error": "Invalid path"}), 403 if not target.is_file(): return jsonify({"error": "File not found"}), 404 return send_from_directory(target.parent, target.name, as_attachment=True) @app.route("/api/admin/files/download-folder") def admin_download_folder(): guard = require_admin() if guard: return guard user_id = request.args.get("user_id", "") rel_path = request.args.get("path", "") try: _, target = _admin_resolve_path(user_id, rel_path) except ValueError: return jsonify({"error": "Invalid path"}), 403 if not target.is_dir(): return jsonify({"error": "Folder not found"}), 404 buf = io.BytesIO() with zipfile.ZipFile(buf, "w", zipfile.ZIP_DEFLATED) as zf: for file in target.rglob("*"): if file.is_file(): zf.write(file, file.relative_to(target)) buf.seek(0) folder_name = target.name or "downloads" return app.response_class( buf.getvalue(), mimetype="application/zip", headers={"Content-Disposition": f'attachment; filename="{folder_name}.zip"'}, ) @app.route("/api/admin/files/delete", methods=["DELETE"]) def admin_delete_file(): guard = require_admin() if guard: return guard user_id = request.args.get("user_id", "") rel_path = request.args.get("path", "") if not rel_path: return jsonify({"error": "Cannot delete root"}), 400 try: _, target = _admin_resolve_path(user_id, rel_path) except ValueError: return jsonify({"error": "Invalid path"}), 403 if not target.exists(): return jsonify({"error": "Not found"}), 404 try: if target.is_dir(): shutil.rmtree(target) else: target.unlink() except Exception as e: return jsonify({"error": str(e)}), 500 return jsonify({"ok": True}) if __name__ == "__main__": app.run(host="0.0.0.0", port=5000, debug=False)