360 lines
10 KiB
Python
360 lines
10 KiB
Python
import io
|
|
import os
|
|
import shutil
|
|
import subprocess
|
|
import threading
|
|
import time
|
|
import uuid
|
|
import zipfile
|
|
from pathlib import Path
|
|
|
|
from flask import (
|
|
Flask,
|
|
jsonify,
|
|
redirect,
|
|
render_template,
|
|
request,
|
|
send_from_directory,
|
|
session,
|
|
url_for,
|
|
)
|
|
|
|
app = Flask(__name__)
|
|
app.secret_key = os.environ.get("SECRET_KEY", os.urandom(32).hex())
|
|
|
|
# Auth
|
|
APP_PASSWORD = os.environ.get("PASSWORD", "")
|
|
|
|
|
|
@app.before_request
|
|
def require_login():
|
|
if not APP_PASSWORD:
|
|
return
|
|
if request.endpoint in ("login", "static", "service_worker", "offline"):
|
|
return
|
|
if not session.get("authenticated"):
|
|
if request.path.startswith("/api/"):
|
|
return jsonify({"error": "Unauthorized"}), 401
|
|
return redirect(url_for("login"))
|
|
|
|
|
|
# Paths
|
|
DOWNLOADS_DIR = Path(os.environ.get("DOWNLOADS_DIR", "/downloads"))
|
|
COOKIES_PATH = Path(os.environ.get("COOKIES_PATH", "/config/cookies.txt"))
|
|
CONFIG_DIR = Path(os.environ.get("CONFIG_DIR", "/config"))
|
|
TEMP_DIR = Path("/tmp/votify")
|
|
|
|
DOWNLOADS_DIR.mkdir(parents=True, exist_ok=True)
|
|
TEMP_DIR.mkdir(parents=True, exist_ok=True)
|
|
|
|
# In-memory job tracking
|
|
jobs: dict[str, dict] = {}
|
|
jobs_lock = threading.Lock()
|
|
|
|
|
|
def snapshot_audio_files(directory: Path) -> set[Path]:
|
|
extensions = {".m4a", ".ogg", ".opus"}
|
|
files = set()
|
|
for ext in extensions:
|
|
files.update(directory.rglob(f"*{ext}"))
|
|
return files
|
|
|
|
|
|
def convert_to_mp3(job_id: str, before: set[Path]):
|
|
after = snapshot_audio_files(DOWNLOADS_DIR)
|
|
new_files = after - before
|
|
if not new_files:
|
|
return
|
|
|
|
def log(msg):
|
|
with jobs_lock:
|
|
jobs[job_id]["output"] = jobs[job_id].get("output", [])[-500:] + [msg]
|
|
|
|
log(f"[mp3] Converting {len(new_files)} file(s) to MP3...")
|
|
for src in sorted(new_files):
|
|
dst = src.with_suffix(".mp3")
|
|
log(f"[mp3] Converting: {src.name}")
|
|
result = subprocess.run(
|
|
["ffmpeg", "-y", "-i", str(src), "-codec:v", "copy", "-q:a", "2", str(dst)],
|
|
capture_output=True, text=True,
|
|
)
|
|
if result.returncode == 0:
|
|
src.unlink()
|
|
log(f"[mp3] Done: {dst.name}")
|
|
else:
|
|
log(f"[mp3] Failed: {src.name} - {result.stderr.strip()[-200:]}")
|
|
|
|
log("[mp3] Conversion complete.")
|
|
|
|
|
|
def run_download(job_id: str, urls: list[str], options: dict):
|
|
cmd = ["votify"]
|
|
|
|
cmd.extend(["--cookies-path", str(COOKIES_PATH)])
|
|
cmd.extend(["--output-path", str(DOWNLOADS_DIR)])
|
|
cmd.extend(["--temp-path", str(TEMP_DIR)])
|
|
|
|
quality = options.get("audio_quality", "aac-medium")
|
|
if quality:
|
|
cmd.extend(["--audio-quality", quality])
|
|
|
|
download_mode = options.get("download_mode", "ytdlp")
|
|
if download_mode:
|
|
cmd.extend(["--download-mode", download_mode])
|
|
|
|
video_format = options.get("video_format", "mp4")
|
|
if video_format:
|
|
cmd.extend(["--video-format", video_format])
|
|
|
|
cover_size = options.get("cover_size", "large")
|
|
if cover_size:
|
|
cmd.extend(["--cover-size", cover_size])
|
|
|
|
if options.get("save_cover"):
|
|
cmd.append("--save-cover")
|
|
|
|
if options.get("save_playlist"):
|
|
cmd.append("--save-playlist")
|
|
|
|
if options.get("overwrite"):
|
|
cmd.append("--overwrite")
|
|
|
|
if options.get("download_music_videos"):
|
|
cmd.append("--download-music-videos")
|
|
|
|
if options.get("save_lrc"):
|
|
cmd.append("--lrc-only")
|
|
|
|
if options.get("no_lrc"):
|
|
cmd.append("--no-lrc")
|
|
|
|
truncate = options.get("truncate")
|
|
if truncate:
|
|
cmd.extend(["--truncate", str(truncate)])
|
|
|
|
cmd.extend(urls)
|
|
|
|
want_mp3 = options.get("output_format") == "mp3"
|
|
files_before = snapshot_audio_files(DOWNLOADS_DIR) if want_mp3 else None
|
|
|
|
with jobs_lock:
|
|
jobs[job_id]["status"] = "running"
|
|
jobs[job_id]["command"] = " ".join(cmd)
|
|
|
|
try:
|
|
process = subprocess.Popen(
|
|
cmd,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.STDOUT,
|
|
text=True,
|
|
bufsize=1,
|
|
)
|
|
output_lines = []
|
|
for line in process.stdout:
|
|
line = line.rstrip("\n")
|
|
output_lines.append(line)
|
|
with jobs_lock:
|
|
jobs[job_id]["output"] = output_lines[-500:]
|
|
|
|
process.wait()
|
|
|
|
if process.returncode == 0 and want_mp3:
|
|
convert_to_mp3(job_id, files_before)
|
|
|
|
with jobs_lock:
|
|
jobs[job_id]["status"] = "completed" if process.returncode == 0 else "failed"
|
|
jobs[job_id]["return_code"] = process.returncode
|
|
except Exception as e:
|
|
with jobs_lock:
|
|
jobs[job_id]["status"] = "failed"
|
|
jobs[job_id]["output"] = jobs[job_id].get("output", []) + [str(e)]
|
|
|
|
|
|
@app.route("/sw.js")
|
|
def service_worker():
|
|
return send_from_directory("static", "sw.js", mimetype="application/javascript")
|
|
|
|
|
|
@app.route("/offline")
|
|
def offline():
|
|
return send_from_directory("static", "offline.html")
|
|
|
|
|
|
@app.route("/login", methods=["GET", "POST"])
|
|
def login():
|
|
if request.method == "POST":
|
|
password = request.form.get("password", "")
|
|
if password == APP_PASSWORD:
|
|
session["authenticated"] = True
|
|
return redirect(url_for("index"))
|
|
return render_template("login.html", error="Incorrect password")
|
|
return render_template("login.html", error=None)
|
|
|
|
|
|
@app.route("/logout")
|
|
def logout():
|
|
session.clear()
|
|
return redirect(url_for("login"))
|
|
|
|
|
|
@app.route("/")
|
|
def index():
|
|
return render_template("index.html", auth_enabled=bool(APP_PASSWORD))
|
|
|
|
|
|
@app.route("/api/download", methods=["POST"])
|
|
def start_download():
|
|
data = request.json
|
|
urls = [u.strip() for u in data.get("urls", "").split("\n") if u.strip()]
|
|
if not urls:
|
|
return jsonify({"error": "No URLs provided"}), 400
|
|
|
|
if not COOKIES_PATH.exists():
|
|
return jsonify({"error": "cookies.txt not found. Mount it to /config/cookies.txt"}), 400
|
|
|
|
options = {
|
|
"audio_quality": data.get("audio_quality", "aac-medium"),
|
|
"download_mode": data.get("download_mode", "ytdlp"),
|
|
"video_format": data.get("video_format", "mp4"),
|
|
"cover_size": data.get("cover_size", "large"),
|
|
"save_cover": data.get("save_cover", False),
|
|
"save_playlist": data.get("save_playlist", False),
|
|
"overwrite": data.get("overwrite", False),
|
|
"download_music_videos": data.get("download_music_videos", False),
|
|
"save_lrc": data.get("save_lrc", False),
|
|
"no_lrc": data.get("no_lrc", False),
|
|
"truncate": data.get("truncate"),
|
|
"output_format": data.get("output_format", "original"),
|
|
}
|
|
|
|
job_id = str(uuid.uuid4())[:8]
|
|
with jobs_lock:
|
|
jobs[job_id] = {
|
|
"id": job_id,
|
|
"urls": urls,
|
|
"options": options,
|
|
"status": "queued",
|
|
"output": [],
|
|
"created_at": time.time(),
|
|
}
|
|
|
|
thread = threading.Thread(target=run_download, args=(job_id, urls, options), daemon=True)
|
|
thread.start()
|
|
|
|
return jsonify({"job_id": job_id})
|
|
|
|
|
|
@app.route("/api/jobs")
|
|
def list_jobs():
|
|
with jobs_lock:
|
|
return jsonify(list(jobs.values()))
|
|
|
|
|
|
@app.route("/api/jobs/<job_id>")
|
|
def get_job(job_id):
|
|
with jobs_lock:
|
|
job = jobs.get(job_id)
|
|
if not job:
|
|
return jsonify({"error": "Job not found"}), 404
|
|
return jsonify(job)
|
|
|
|
|
|
@app.route("/api/jobs/<job_id>", methods=["DELETE"])
|
|
def delete_job(job_id):
|
|
with jobs_lock:
|
|
if job_id in jobs:
|
|
del jobs[job_id]
|
|
return jsonify({"ok": True})
|
|
|
|
|
|
@app.route("/api/files")
|
|
def list_files():
|
|
rel_path = request.args.get("path", "")
|
|
target = DOWNLOADS_DIR / rel_path
|
|
if not target.exists():
|
|
return jsonify([])
|
|
|
|
items = []
|
|
try:
|
|
for entry in sorted(target.iterdir(), key=lambda e: (not e.is_dir(), e.name.lower())):
|
|
rel = entry.relative_to(DOWNLOADS_DIR)
|
|
items.append({
|
|
"name": entry.name,
|
|
"path": str(rel).replace("\\", "/"),
|
|
"is_dir": entry.is_dir(),
|
|
"size": entry.stat().st_size if entry.is_file() else None,
|
|
})
|
|
except PermissionError:
|
|
pass
|
|
return jsonify(items)
|
|
|
|
|
|
@app.route("/api/files/download")
|
|
def download_file():
|
|
rel_path = request.args.get("path", "")
|
|
target = DOWNLOADS_DIR / rel_path
|
|
if not target.is_file():
|
|
return jsonify({"error": "File not found"}), 404
|
|
return send_from_directory(target.parent, target.name, as_attachment=True)
|
|
|
|
|
|
@app.route("/api/files/download-folder")
|
|
def download_folder():
|
|
rel_path = request.args.get("path", "")
|
|
target = DOWNLOADS_DIR / rel_path
|
|
if not target.is_dir():
|
|
return jsonify({"error": "Folder not found"}), 404
|
|
|
|
buf = io.BytesIO()
|
|
with zipfile.ZipFile(buf, "w", zipfile.ZIP_DEFLATED) as zf:
|
|
for file in target.rglob("*"):
|
|
if file.is_file():
|
|
zf.write(file, file.relative_to(target))
|
|
buf.seek(0)
|
|
|
|
folder_name = target.name or "downloads"
|
|
return app.response_class(
|
|
buf.getvalue(),
|
|
mimetype="application/zip",
|
|
headers={"Content-Disposition": f'attachment; filename="{folder_name}.zip"'},
|
|
)
|
|
|
|
|
|
@app.route("/api/files/delete", methods=["DELETE"])
|
|
def delete_path():
|
|
rel_path = request.args.get("path", "")
|
|
if not rel_path:
|
|
return jsonify({"error": "Cannot delete root"}), 400
|
|
target = DOWNLOADS_DIR / rel_path
|
|
if not target.exists():
|
|
return jsonify({"error": "Not found"}), 404
|
|
if not str(target.resolve()).startswith(str(DOWNLOADS_DIR.resolve())):
|
|
return jsonify({"error": "Invalid path"}), 403
|
|
try:
|
|
if target.is_dir():
|
|
shutil.rmtree(target)
|
|
else:
|
|
target.unlink()
|
|
except Exception as e:
|
|
return jsonify({"error": str(e)}), 500
|
|
return jsonify({"ok": True})
|
|
|
|
|
|
@app.route("/api/cookies", methods=["GET"])
|
|
def check_cookies():
|
|
return jsonify({"exists": COOKIES_PATH.exists()})
|
|
|
|
|
|
@app.route("/api/cookies", methods=["POST"])
|
|
def upload_cookies():
|
|
if "file" not in request.files:
|
|
return jsonify({"error": "No file uploaded"}), 400
|
|
file = request.files["file"]
|
|
CONFIG_DIR.mkdir(parents=True, exist_ok=True)
|
|
file.save(COOKIES_PATH)
|
|
return jsonify({"ok": True})
|
|
|
|
|
|
if __name__ == "__main__":
|
|
app.run(host="0.0.0.0", port=5000, debug=False)
|