import io import json import os import shutil import subprocess import threading import time import uuid import zipfile from pathlib import Path from utils import rename_from_metadata, sanitize_filename, cleanup_empty_dirs from flask import ( Flask, jsonify, redirect, render_template, request, send_from_directory, session, url_for, ) app = Flask(__name__) app.secret_key = os.environ.get("SECRET_KEY", os.urandom(32).hex()) # Auth APP_PASSWORD = os.environ.get("PASSWORD", "") @app.before_request def require_login(): if not APP_PASSWORD: return if request.endpoint in ("login", "static", "service_worker", "offline"): return if not session.get("authenticated"): if request.path.startswith("/api/"): return jsonify({"error": "Unauthorized"}), 401 return redirect(url_for("login")) # Paths DOWNLOADS_DIR = Path(os.environ.get("DOWNLOADS_DIR", "/downloads")) COOKIES_PATH = Path(os.environ.get("COOKIES_PATH", "/config/cookies.txt")) CONFIG_DIR = Path(os.environ.get("CONFIG_DIR", "/config")) WVD_PATH = Path(os.environ.get("WVD_PATH", "/config/device.wvd")) TEMP_DIR = Path("/tmp/votify") DOWNLOADS_DIR.mkdir(parents=True, exist_ok=True) TEMP_DIR.mkdir(parents=True, exist_ok=True) # In-memory job tracking jobs: dict[str, dict] = {} jobs_lock = threading.Lock() def snapshot_audio_files(directory: Path) -> set[Path]: extensions = {".m4a", ".ogg", ".opus"} files = set() for ext in extensions: files.update(directory.rglob(f"*{ext}")) return files def convert_to_mp3(job_id: str, before: set[Path]): after = snapshot_audio_files(DOWNLOADS_DIR) new_files = after - before if not new_files: return def log(msg): with jobs_lock: jobs[job_id]["output"] = jobs[job_id].get("output", [])[-500:] + [msg] log(f"[mp3] Converting {len(new_files)} file(s) to MP3...") for src in sorted(new_files): dst = src.with_suffix(".mp3") log(f"[mp3] Converting: {src.name}") result = subprocess.run( ["ffmpeg", "-y", "-i", str(src), "-codec:v", "copy", "-q:a", "2", str(dst)], capture_output=True, text=True, ) if result.returncode == 0: src.unlink() log(f"[mp3] Done: {dst.name}") else: log(f"[mp3] Failed: {src.name} - {result.stderr.strip()[-200:]}") log("[mp3] Conversion complete.") AUDIO_EXTS = {".m4a", ".ogg", ".opus", ".mp3", ".flac"} def post_process_votify_files(target_dir, job_id): """Flatten subdirs, rename files from metadata, wrap single tracks.""" target_dir = Path(target_dir) def log(msg): with jobs_lock: jobs[job_id]["output"] = jobs[job_id].get("output", [])[-500:] + [msg] # 1. Move any files in subdirectories up to target_dir for ext in AUDIO_EXTS: for f in list(target_dir.rglob(f"*{ext}")): if f.parent != target_dir: dest = target_dir / f.name counter = 2 while dest.exists(): dest = target_dir / f"{f.stem} ({counter}){f.suffix}" counter += 1 shutil.move(str(f), str(dest)) # 2. Rename from metadata renamed = [] for ext in AUDIO_EXTS: for f in list(target_dir.glob(f"*{ext}")): new_path = rename_from_metadata(f) renamed.append(new_path) if new_path != f: log(f"[post] Renamed: {new_path.name}") # 3. Clean up empty subdirs cleanup_empty_dirs(target_dir) # 4. Single-track wrapping (only if downloading to root downloads dir) if len(renamed) == 1 and target_dir == DOWNLOADS_DIR: f = renamed[0] folder_name = sanitize_filename(f.stem) wrapper = target_dir / folder_name wrapper.mkdir(exist_ok=True) shutil.move(str(f), str(wrapper / f.name)) log(f"[post] Wrapped single track in folder: {folder_name}") return renamed def run_download(job_id: str, urls: list[str], options: dict, output_path: str = None): cmd = ["votify"] cmd.extend(["--cookies-path", str(COOKIES_PATH)]) cmd.extend(["--output-path", output_path or str(DOWNLOADS_DIR)]) cmd.extend(["--temp-path", str(TEMP_DIR)]) if WVD_PATH.exists(): cmd.extend(["--wvd-path", str(WVD_PATH)]) # Flatten folder structure — no nested Artist/Album subdirectories cmd.extend(["--template-folder-album", "."]) cmd.extend(["--template-folder-compilation", "."]) cmd.extend(["--template-folder-episode", "."]) cmd.extend(["--template-folder-music-video", "."]) quality = options.get("audio_quality", "aac-medium") if quality: cmd.extend(["--audio-quality", quality]) download_mode = options.get("download_mode", "ytdlp") if download_mode: cmd.extend(["--download-mode", download_mode]) video_format = options.get("video_format", "mp4") if video_format: cmd.extend(["--video-format", video_format]) cover_size = options.get("cover_size", "large") if cover_size: cmd.extend(["--cover-size", cover_size]) if options.get("save_cover"): cmd.append("--save-cover") if options.get("save_playlist"): cmd.append("--save-playlist") if options.get("overwrite"): cmd.append("--overwrite") if options.get("download_music_videos"): cmd.append("--download-music-videos") if options.get("save_lrc"): cmd.append("--lrc-only") if options.get("no_lrc"): cmd.append("--no-lrc") truncate = options.get("truncate") if truncate: cmd.extend(["--truncate", str(truncate)]) cmd.extend(urls) want_mp3 = options.get("output_format") == "mp3" files_before = snapshot_audio_files(DOWNLOADS_DIR) if want_mp3 else None with jobs_lock: jobs[job_id]["status"] = "running" jobs[job_id]["command"] = " ".join(cmd) try: process = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, bufsize=1, ) with jobs_lock: jobs[job_id]["process"] = process output_lines = [] for line in process.stdout: with jobs_lock: if jobs[job_id]["status"] == "cancelled": break line = line.rstrip("\n") output_lines.append(line) with jobs_lock: jobs[job_id]["output"] = output_lines[-500:] process.wait() with jobs_lock: cancelled = jobs[job_id]["status"] == "cancelled" if cancelled: with jobs_lock: jobs[job_id]["output"] = jobs[job_id].get("output", []) + ["[cancelled] Job was cancelled by user."] else: if process.returncode == 0: target = Path(output_path) if output_path else DOWNLOADS_DIR post_process_votify_files(target, job_id) if want_mp3: convert_to_mp3(job_id, files_before) with jobs_lock: jobs[job_id]["status"] = "completed" if process.returncode == 0 else "failed" jobs[job_id]["return_code"] = process.returncode except Exception as e: with jobs_lock: jobs[job_id]["status"] = "failed" jobs[job_id]["output"] = jobs[job_id].get("output", []) + [str(e)] @app.route("/sw.js") def service_worker(): return send_from_directory("static", "sw.js", mimetype="application/javascript") @app.route("/offline") def offline(): return send_from_directory("static", "offline.html") @app.route("/login", methods=["GET", "POST"]) def login(): if request.method == "POST": password = request.form.get("password", "") if password == APP_PASSWORD: session["authenticated"] = True return redirect(url_for("index")) return render_template("login.html", error="Incorrect password") return render_template("login.html", error=None) @app.route("/logout") def logout(): session.clear() return redirect(url_for("login")) @app.route("/") def index(): return render_template("index.html", auth_enabled=bool(APP_PASSWORD)) @app.route("/api/download", methods=["POST"]) def start_download(): data = request.json urls = [u.strip() for u in data.get("urls", "").split("\n") if u.strip()] if not urls: return jsonify({"error": "No URLs provided"}), 400 if not COOKIES_PATH.exists(): return jsonify({"error": "cookies.txt not found. Mount it to /config/cookies.txt"}), 400 options = { "audio_quality": data.get("audio_quality", "aac-medium"), "download_mode": data.get("download_mode", "ytdlp"), "video_format": data.get("video_format", "mp4"), "cover_size": data.get("cover_size", "large"), "save_cover": data.get("save_cover", False), "save_playlist": data.get("save_playlist", False), "overwrite": data.get("overwrite", False), "download_music_videos": data.get("download_music_videos", False), "save_lrc": data.get("save_lrc", False), "no_lrc": data.get("no_lrc", False), "truncate": data.get("truncate"), "output_format": data.get("output_format", "original"), } job_id = str(uuid.uuid4())[:8] with jobs_lock: jobs[job_id] = { "id": job_id, "urls": urls, "options": options, "status": "queued", "output": [], "created_at": time.time(), } thread = threading.Thread(target=run_download, args=(job_id, urls, options), daemon=True) thread.start() return jsonify({"job_id": job_id}) def run_monochrome_download(job_id: str, url: str, quality: str): with jobs_lock: jobs[job_id]["status"] = "running" def log(msg): with jobs_lock: jobs[job_id]["output"] = jobs[job_id].get("output", [])[-500:] + [msg] def is_cancelled(): with jobs_lock: return jobs[job_id]["status"] == "cancelled" try: from monochrome.api import download_spotify_url success, total, fail_info = download_spotify_url( spotify_url=url, quality=quality, output_dir=str(DOWNLOADS_DIR), log=log, cancel_check=is_cancelled, ) with jobs_lock: if jobs[job_id]["status"] != "cancelled": jobs[job_id]["status"] = "completed" if success > 0 else "failed" jobs[job_id]["return_code"] = 0 if success > 0 else 1 except Exception as e: with jobs_lock: jobs[job_id]["status"] = "failed" jobs[job_id]["output"] = jobs[job_id].get("output", []) + [f"[error] {e}"] jobs[job_id]["return_code"] = 1 @app.route("/api/monochrome/download", methods=["POST"]) def start_monochrome_download(): data = request.json url = data.get("url", "").strip() quality = data.get("quality", "HI_RES_LOSSLESS") if not url: return jsonify({"error": "No URL provided"}), 400 valid_qualities = ["HI_RES_LOSSLESS", "LOSSLESS", "HIGH", "LOW", "MP3_320"] if quality not in valid_qualities: return jsonify({"error": f"Invalid quality. Choose from: {valid_qualities}"}), 400 job_id = str(uuid.uuid4())[:8] with jobs_lock: jobs[job_id] = { "id": job_id, "urls": [url], "options": {"quality": quality, "source": "monochrome"}, "status": "queued", "output": [], "created_at": time.time(), } thread = threading.Thread( target=run_monochrome_download, args=(job_id, url, quality), daemon=True ) thread.start() return jsonify({"job_id": job_id}) # --- Unified download (Monochrome → Votify fallback) --- SETTINGS_PATH = CONFIG_DIR / "settings.json" def load_settings(): if SETTINGS_PATH.exists(): try: return json.loads(SETTINGS_PATH.read_text()) except (json.JSONDecodeError, OSError): pass return {"fallback_quality": "aac-medium"} def run_unified_download(job_id: str, url: str): with jobs_lock: jobs[job_id]["status"] = "running" def log(msg): with jobs_lock: jobs[job_id]["output"] = jobs[job_id].get("output", [])[-500:] + [msg] def is_cancelled(): with jobs_lock: return jobs[job_id]["status"] == "cancelled" try: from monochrome.api import download_spotify_url success, total, fail_info = download_spotify_url( spotify_url=url, quality="MP3_320", output_dir=str(DOWNLOADS_DIR), log=log, cancel_check=is_cancelled, ) with jobs_lock: if jobs[job_id]["status"] != "cancelled": jobs[job_id]["status"] = "completed" if success > 0 else "failed" jobs[job_id]["return_code"] = 0 if success > 0 else 1 except Exception as e: with jobs_lock: jobs[job_id]["status"] = "failed" jobs[job_id]["output"] = jobs[job_id].get("output", []) + [f"[error] {e}"] jobs[job_id]["return_code"] = 1 return # Check if we should spawn Votify fallback with jobs_lock: cancelled = jobs[job_id]["status"] == "cancelled" failed_urls = fail_info.get("failed_urls", []) if cancelled or not failed_urls: return # Spawn Votify fallback job for failed tracks settings = load_settings() fallback_quality = settings.get("fallback_quality", "aac-medium") subfolder = fail_info.get("subfolder") output_path = str(DOWNLOADS_DIR / subfolder) if subfolder else str(DOWNLOADS_DIR) votify_job_id = str(uuid.uuid4())[:8] with jobs_lock: jobs[votify_job_id] = { "id": votify_job_id, "urls": failed_urls, "options": {"audio_quality": fallback_quality, "source": "votify-fallback"}, "status": "queued", "output": [], "created_at": time.time(), } log(f"[monochrome] {len(failed_urls)} track(s) failed — starting Votify fallback (job {votify_job_id})") run_download(votify_job_id, failed_urls, { "audio_quality": fallback_quality, "output_format": "mp3", }, output_path=output_path) @app.route("/api/unified/download", methods=["POST"]) def start_unified_download(): data = request.json url = data.get("url", "").strip() if not url: return jsonify({"error": "No URL provided"}), 400 job_id = str(uuid.uuid4())[:8] with jobs_lock: jobs[job_id] = { "id": job_id, "urls": [url], "options": {"source": "unified"}, "status": "queued", "output": [], "created_at": time.time(), } thread = threading.Thread( target=run_unified_download, args=(job_id, url), daemon=True ) thread.start() return jsonify({"job_id": job_id}) @app.route("/api/settings", methods=["GET"]) def get_settings(): return jsonify(load_settings()) @app.route("/api/settings", methods=["POST"]) def save_settings(): data = request.json allowed_qualities = ["aac-medium", "aac-high"] quality = data.get("fallback_quality", "aac-medium") if quality not in allowed_qualities: quality = "aac-medium" settings = {"fallback_quality": quality} CONFIG_DIR.mkdir(parents=True, exist_ok=True) SETTINGS_PATH.write_text(json.dumps(settings)) return jsonify({"ok": True}) def job_to_dict(job): return {k: v for k, v in job.items() if k != "process"} @app.route("/api/jobs") def list_jobs(): with jobs_lock: return jsonify([job_to_dict(j) for j in jobs.values()]) @app.route("/api/jobs/") def get_job(job_id): with jobs_lock: job = jobs.get(job_id) if not job: return jsonify({"error": "Job not found"}), 404 return jsonify(job_to_dict(job)) @app.route("/api/jobs//cancel", methods=["POST"]) def cancel_job(job_id): with jobs_lock: job = jobs.get(job_id) if not job: return jsonify({"error": "Job not found"}), 404 if job["status"] != "running": return jsonify({"error": "Job is not running"}), 400 job["status"] = "cancelled" proc = job.get("process") if proc: proc.terminate() return jsonify({"ok": True}) @app.route("/api/jobs/", methods=["DELETE"]) def delete_job(job_id): with jobs_lock: if job_id in jobs: del jobs[job_id] return jsonify({"ok": True}) @app.route("/api/files") def list_files(): rel_path = request.args.get("path", "") target = DOWNLOADS_DIR / rel_path if not target.exists(): return jsonify([]) items = [] try: for entry in sorted(target.iterdir(), key=lambda e: (not e.is_dir(), e.name.lower())): rel = entry.relative_to(DOWNLOADS_DIR) items.append({ "name": entry.name, "path": str(rel).replace("\\", "/"), "is_dir": entry.is_dir(), "size": entry.stat().st_size if entry.is_file() else None, }) except PermissionError: pass return jsonify(items) @app.route("/api/files/download") def download_file(): rel_path = request.args.get("path", "") target = DOWNLOADS_DIR / rel_path if not target.is_file(): return jsonify({"error": "File not found"}), 404 return send_from_directory(target.parent, target.name, as_attachment=True) @app.route("/api/files/download-folder") def download_folder(): rel_path = request.args.get("path", "") target = DOWNLOADS_DIR / rel_path if not target.is_dir(): return jsonify({"error": "Folder not found"}), 404 buf = io.BytesIO() with zipfile.ZipFile(buf, "w", zipfile.ZIP_DEFLATED) as zf: for file in target.rglob("*"): if file.is_file(): zf.write(file, file.relative_to(target)) buf.seek(0) folder_name = target.name or "downloads" return app.response_class( buf.getvalue(), mimetype="application/zip", headers={"Content-Disposition": f'attachment; filename="{folder_name}.zip"'}, ) @app.route("/api/files/delete", methods=["DELETE"]) def delete_path(): rel_path = request.args.get("path", "") if not rel_path: return jsonify({"error": "Cannot delete root"}), 400 target = DOWNLOADS_DIR / rel_path if not target.exists(): return jsonify({"error": "Not found"}), 404 if not str(target.resolve()).startswith(str(DOWNLOADS_DIR.resolve())): return jsonify({"error": "Invalid path"}), 403 try: if target.is_dir(): shutil.rmtree(target) else: target.unlink() except Exception as e: return jsonify({"error": str(e)}), 500 return jsonify({"ok": True}) @app.route("/api/cookies", methods=["GET"]) def check_cookies(): return jsonify({"exists": COOKIES_PATH.exists()}) @app.route("/api/cookies", methods=["POST"]) def upload_cookies(): if "file" not in request.files: return jsonify({"error": "No file uploaded"}), 400 file = request.files["file"] CONFIG_DIR.mkdir(parents=True, exist_ok=True) file.save(COOKIES_PATH) return jsonify({"ok": True}) @app.route("/api/wvd", methods=["GET"]) def check_wvd(): return jsonify({"exists": WVD_PATH.exists()}) @app.route("/api/wvd", methods=["POST"]) def upload_wvd(): if "file" not in request.files: return jsonify({"error": "No file uploaded"}), 400 file = request.files["file"] CONFIG_DIR.mkdir(parents=True, exist_ok=True) file.save(WVD_PATH) return jsonify({"ok": True}) if __name__ == "__main__": app.run(host="0.0.0.0", port=5000, debug=False)