import io import os import shutil import subprocess import threading import time import uuid import zipfile from pathlib import Path from flask import ( Flask, jsonify, redirect, render_template, request, send_from_directory, session, url_for, ) app = Flask(__name__) app.secret_key = os.environ.get("SECRET_KEY", os.urandom(32).hex()) # Auth APP_PASSWORD = os.environ.get("PASSWORD", "") @app.before_request def require_login(): if not APP_PASSWORD: return if request.endpoint in ("login", "static"): return if not session.get("authenticated"): if request.path.startswith("/api/"): return jsonify({"error": "Unauthorized"}), 401 return redirect(url_for("login")) # Paths DOWNLOADS_DIR = Path(os.environ.get("DOWNLOADS_DIR", "/downloads")) COOKIES_PATH = Path(os.environ.get("COOKIES_PATH", "/config/cookies.txt")) CONFIG_DIR = Path(os.environ.get("CONFIG_DIR", "/config")) TEMP_DIR = Path("/tmp/votify") DOWNLOADS_DIR.mkdir(parents=True, exist_ok=True) TEMP_DIR.mkdir(parents=True, exist_ok=True) # In-memory job tracking jobs: dict[str, dict] = {} jobs_lock = threading.Lock() def snapshot_audio_files(directory: Path) -> set[Path]: extensions = {".m4a", ".ogg", ".opus"} files = set() for ext in extensions: files.update(directory.rglob(f"*{ext}")) return files def convert_to_mp3(job_id: str, before: set[Path]): after = snapshot_audio_files(DOWNLOADS_DIR) new_files = after - before if not new_files: return def log(msg): with jobs_lock: jobs[job_id]["output"] = jobs[job_id].get("output", [])[-500:] + [msg] log(f"[mp3] Converting {len(new_files)} file(s) to MP3...") for src in sorted(new_files): dst = src.with_suffix(".mp3") log(f"[mp3] Converting: {src.name}") result = subprocess.run( ["ffmpeg", "-y", "-i", str(src), "-codec:v", "copy", "-q:a", "2", str(dst)], capture_output=True, text=True, ) if result.returncode == 0: src.unlink() log(f"[mp3] Done: {dst.name}") else: log(f"[mp3] Failed: {src.name} - {result.stderr.strip()[-200:]}") log("[mp3] Conversion complete.") def run_download(job_id: str, urls: list[str], options: dict): cmd = ["votify"] cmd.extend(["--cookies-path", str(COOKIES_PATH)]) cmd.extend(["--output-path", str(DOWNLOADS_DIR)]) cmd.extend(["--temp-path", str(TEMP_DIR)]) quality = options.get("audio_quality", "aac-medium") if quality: cmd.extend(["--audio-quality", quality]) download_mode = options.get("download_mode", "ytdlp") if download_mode: cmd.extend(["--download-mode", download_mode]) video_format = options.get("video_format", "mp4") if video_format: cmd.extend(["--video-format", video_format]) cover_size = options.get("cover_size", "large") if cover_size: cmd.extend(["--cover-size", cover_size]) if options.get("save_cover"): cmd.append("--save-cover") if options.get("save_playlist"): cmd.append("--save-playlist") if options.get("overwrite"): cmd.append("--overwrite") if options.get("download_music_videos"): cmd.append("--download-music-videos") if options.get("save_lrc"): cmd.append("--lrc-only") if options.get("no_lrc"): cmd.append("--no-lrc") truncate = options.get("truncate") if truncate: cmd.extend(["--truncate", str(truncate)]) cmd.extend(urls) want_mp3 = options.get("output_format") == "mp3" files_before = snapshot_audio_files(DOWNLOADS_DIR) if want_mp3 else None with jobs_lock: jobs[job_id]["status"] = "running" jobs[job_id]["command"] = " ".join(cmd) try: process = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, bufsize=1, ) output_lines = [] for line in process.stdout: line = line.rstrip("\n") output_lines.append(line) with jobs_lock: jobs[job_id]["output"] = output_lines[-500:] process.wait() if process.returncode == 0 and want_mp3: convert_to_mp3(job_id, files_before) with jobs_lock: jobs[job_id]["status"] = "completed" if process.returncode == 0 else "failed" jobs[job_id]["return_code"] = process.returncode except Exception as e: with jobs_lock: jobs[job_id]["status"] = "failed" jobs[job_id]["output"] = jobs[job_id].get("output", []) + [str(e)] @app.route("/login", methods=["GET", "POST"]) def login(): if request.method == "POST": password = request.form.get("password", "") if password == APP_PASSWORD: session["authenticated"] = True return redirect(url_for("index")) return render_template("login.html", error="Incorrect password") return render_template("login.html", error=None) @app.route("/logout") def logout(): session.clear() return redirect(url_for("login")) @app.route("/") def index(): return render_template("index.html", auth_enabled=bool(APP_PASSWORD)) @app.route("/api/download", methods=["POST"]) def start_download(): data = request.json urls = [u.strip() for u in data.get("urls", "").split("\n") if u.strip()] if not urls: return jsonify({"error": "No URLs provided"}), 400 if not COOKIES_PATH.exists(): return jsonify({"error": "cookies.txt not found. Mount it to /config/cookies.txt"}), 400 options = { "audio_quality": data.get("audio_quality", "aac-medium"), "download_mode": data.get("download_mode", "ytdlp"), "video_format": data.get("video_format", "mp4"), "cover_size": data.get("cover_size", "large"), "save_cover": data.get("save_cover", False), "save_playlist": data.get("save_playlist", False), "overwrite": data.get("overwrite", False), "download_music_videos": data.get("download_music_videos", False), "save_lrc": data.get("save_lrc", False), "no_lrc": data.get("no_lrc", False), "truncate": data.get("truncate"), "output_format": data.get("output_format", "original"), } job_id = str(uuid.uuid4())[:8] with jobs_lock: jobs[job_id] = { "id": job_id, "urls": urls, "options": options, "status": "queued", "output": [], "created_at": time.time(), } thread = threading.Thread(target=run_download, args=(job_id, urls, options), daemon=True) thread.start() return jsonify({"job_id": job_id}) @app.route("/api/jobs") def list_jobs(): with jobs_lock: return jsonify(list(jobs.values())) @app.route("/api/jobs/") def get_job(job_id): with jobs_lock: job = jobs.get(job_id) if not job: return jsonify({"error": "Job not found"}), 404 return jsonify(job) @app.route("/api/jobs/", methods=["DELETE"]) def delete_job(job_id): with jobs_lock: if job_id in jobs: del jobs[job_id] return jsonify({"ok": True}) @app.route("/api/files") def list_files(): rel_path = request.args.get("path", "") target = DOWNLOADS_DIR / rel_path if not target.exists(): return jsonify([]) items = [] try: for entry in sorted(target.iterdir(), key=lambda e: (not e.is_dir(), e.name.lower())): rel = entry.relative_to(DOWNLOADS_DIR) items.append({ "name": entry.name, "path": str(rel).replace("\\", "/"), "is_dir": entry.is_dir(), "size": entry.stat().st_size if entry.is_file() else None, }) except PermissionError: pass return jsonify(items) @app.route("/api/files/download") def download_file(): rel_path = request.args.get("path", "") target = DOWNLOADS_DIR / rel_path if not target.is_file(): return jsonify({"error": "File not found"}), 404 return send_from_directory(target.parent, target.name, as_attachment=True) @app.route("/api/files/download-folder") def download_folder(): rel_path = request.args.get("path", "") target = DOWNLOADS_DIR / rel_path if not target.is_dir(): return jsonify({"error": "Folder not found"}), 404 buf = io.BytesIO() with zipfile.ZipFile(buf, "w", zipfile.ZIP_DEFLATED) as zf: for file in target.rglob("*"): if file.is_file(): zf.write(file, file.relative_to(target)) buf.seek(0) folder_name = target.name or "downloads" return app.response_class( buf.getvalue(), mimetype="application/zip", headers={"Content-Disposition": f'attachment; filename="{folder_name}.zip"'}, ) @app.route("/api/files/delete", methods=["DELETE"]) def delete_path(): rel_path = request.args.get("path", "") if not rel_path: return jsonify({"error": "Cannot delete root"}), 400 target = DOWNLOADS_DIR / rel_path if not target.exists(): return jsonify({"error": "Not found"}), 404 if not str(target.resolve()).startswith(str(DOWNLOADS_DIR.resolve())): return jsonify({"error": "Invalid path"}), 403 try: if target.is_dir(): shutil.rmtree(target) else: target.unlink() except Exception as e: return jsonify({"error": str(e)}), 500 return jsonify({"ok": True}) @app.route("/api/cookies", methods=["GET"]) def check_cookies(): return jsonify({"exists": COOKIES_PATH.exists()}) @app.route("/api/cookies", methods=["POST"]) def upload_cookies(): if "file" not in request.files: return jsonify({"error": "No file uploaded"}), 400 file = request.files["file"] CONFIG_DIR.mkdir(parents=True, exist_ok=True) file.save(COOKIES_PATH) return jsonify({"ok": True}) if __name__ == "__main__": app.run(host="0.0.0.0", port=5000, debug=False)