1078 lines
35 KiB
Python
1078 lines
35 KiB
Python
import io
|
||
import json
|
||
import os
|
||
import shutil
|
||
import subprocess
|
||
import threading
|
||
import time
|
||
import uuid
|
||
import zipfile
|
||
from pathlib import Path
|
||
|
||
import db as database
|
||
from utils import rename_from_metadata, sanitize_filename, cleanup_empty_dirs
|
||
|
||
from flask import (
|
||
Flask,
|
||
jsonify,
|
||
redirect,
|
||
render_template,
|
||
request,
|
||
send_from_directory,
|
||
session,
|
||
url_for,
|
||
)
|
||
|
||
app = Flask(__name__)
|
||
app.secret_key = os.environ.get("SECRET_KEY", os.urandom(32).hex())
|
||
|
||
# Paths
|
||
DOWNLOADS_DIR = Path(os.environ.get("DOWNLOADS_DIR", "/downloads"))
|
||
COOKIES_PATH = Path(os.environ.get("COOKIES_PATH", "/config/cookies.txt"))
|
||
CONFIG_DIR = Path(os.environ.get("CONFIG_DIR", "/config"))
|
||
WVD_PATH = Path(os.environ.get("WVD_PATH", "/config/device.wvd"))
|
||
TEMP_DIR = Path("/tmp/votify")
|
||
|
||
DOWNLOADS_DIR.mkdir(parents=True, exist_ok=True)
|
||
TEMP_DIR.mkdir(parents=True, exist_ok=True)
|
||
|
||
# Database
|
||
DB_PATH = CONFIG_DIR / "trackpull.db"
|
||
database.init_db(DB_PATH)
|
||
|
||
# In-memory job tracking
|
||
jobs: dict[str, dict] = {}
|
||
jobs_lock = threading.Lock()
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Auth helpers
|
||
# ---------------------------------------------------------------------------
|
||
|
||
@app.before_request
|
||
def require_login():
|
||
public = {"login", "logout", "static", "service_worker", "offline"}
|
||
if request.endpoint in public:
|
||
return
|
||
if not session.get("user_id"):
|
||
if request.path.startswith("/api/"):
|
||
return jsonify({"error": "Unauthorized"}), 401
|
||
return redirect(url_for("login"))
|
||
|
||
|
||
def require_admin():
|
||
if session.get("role") != "admin":
|
||
return jsonify({"error": "Forbidden"}), 403
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Per-user download directory
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def get_user_downloads_dir(user_id: str) -> Path:
|
||
d = DOWNLOADS_DIR / user_id
|
||
d.mkdir(parents=True, exist_ok=True)
|
||
return d
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Audio helpers
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def snapshot_audio_files(directory: Path) -> set[Path]:
|
||
extensions = {".m4a", ".ogg", ".opus"}
|
||
files = set()
|
||
for ext in extensions:
|
||
files.update(directory.rglob(f"*{ext}"))
|
||
return files
|
||
|
||
|
||
def convert_to_mp3(job_id: str, before: set[Path], user_dir: Path):
|
||
after = snapshot_audio_files(user_dir)
|
||
new_files = after - before
|
||
if not new_files:
|
||
return
|
||
|
||
def log(msg):
|
||
with jobs_lock:
|
||
jobs[job_id]["output"] = jobs[job_id].get("output", [])[-500:] + [msg]
|
||
|
||
log(f"[mp3] Converting {len(new_files)} file(s) to MP3...")
|
||
for src in sorted(new_files):
|
||
dst = src.with_suffix(".mp3")
|
||
log(f"[mp3] Converting: {src.name}")
|
||
result = subprocess.run(
|
||
["ffmpeg", "-y", "-i", str(src), "-codec:v", "copy", "-q:a", "2", str(dst)],
|
||
capture_output=True, text=True,
|
||
)
|
||
if result.returncode == 0:
|
||
src.unlink()
|
||
log(f"[mp3] Done: {dst.name}")
|
||
else:
|
||
log(f"[mp3] Failed: {src.name} - {result.stderr.strip()[-200:]}")
|
||
|
||
log("[mp3] Conversion complete.")
|
||
|
||
|
||
AUDIO_EXTS = {".m4a", ".ogg", ".opus", ".mp3", ".flac"}
|
||
|
||
|
||
def post_process_votify_files(target_dir, job_id, user_dir: Path):
|
||
"""Flatten subdirs, rename files from metadata, wrap single tracks."""
|
||
target_dir = Path(target_dir)
|
||
|
||
def log(msg):
|
||
with jobs_lock:
|
||
jobs[job_id]["output"] = jobs[job_id].get("output", [])[-500:] + [msg]
|
||
|
||
# 1. Move any files in subdirectories up to target_dir
|
||
for ext in AUDIO_EXTS:
|
||
for f in list(target_dir.rglob(f"*{ext}")):
|
||
if f.parent != target_dir:
|
||
dest = target_dir / f.name
|
||
counter = 2
|
||
while dest.exists():
|
||
dest = target_dir / f"{f.stem} ({counter}){f.suffix}"
|
||
counter += 1
|
||
shutil.move(str(f), str(dest))
|
||
|
||
# 2. Rename from metadata
|
||
renamed = []
|
||
for ext in AUDIO_EXTS:
|
||
for f in list(target_dir.glob(f"*{ext}")):
|
||
new_path = rename_from_metadata(f)
|
||
renamed.append(new_path)
|
||
if new_path != f:
|
||
log(f"[post] Renamed: {new_path.name}")
|
||
|
||
# 3. Clean up empty subdirs
|
||
cleanup_empty_dirs(target_dir)
|
||
|
||
# 4. Single-track wrapping (only if downloading to user's root dir)
|
||
if len(renamed) == 1 and target_dir == user_dir:
|
||
f = renamed[0]
|
||
folder_name = sanitize_filename(f.stem)
|
||
wrapper = target_dir / folder_name
|
||
wrapper.mkdir(exist_ok=True)
|
||
shutil.move(str(f), str(wrapper / f.name))
|
||
log(f"[post] Wrapped single track in folder: {folder_name}")
|
||
|
||
return renamed
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Download runners
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def run_download(job_id: str, urls: list[str], options: dict,
|
||
output_path: str = None, user_id: str = None):
|
||
user_dir = get_user_downloads_dir(user_id) if user_id else DOWNLOADS_DIR
|
||
effective_output = output_path or str(user_dir)
|
||
|
||
cmd = ["votify"]
|
||
cmd.extend(["--cookies-path", str(COOKIES_PATH)])
|
||
cmd.extend(["--output-path", effective_output])
|
||
cmd.extend(["--temp-path", str(TEMP_DIR)])
|
||
if WVD_PATH.exists():
|
||
cmd.extend(["--wvd-path", str(WVD_PATH)])
|
||
|
||
cmd.extend(["--template-folder-album", "."])
|
||
cmd.extend(["--template-folder-compilation", "."])
|
||
cmd.extend(["--template-folder-episode", "."])
|
||
cmd.extend(["--template-folder-music-video", "."])
|
||
|
||
quality = options.get("audio_quality", "aac-medium")
|
||
if quality:
|
||
cmd.extend(["--audio-quality", quality])
|
||
|
||
download_mode = options.get("download_mode", "ytdlp")
|
||
if download_mode:
|
||
cmd.extend(["--download-mode", download_mode])
|
||
|
||
video_format = options.get("video_format", "mp4")
|
||
if video_format:
|
||
cmd.extend(["--video-format", video_format])
|
||
|
||
cover_size = options.get("cover_size", "large")
|
||
if cover_size:
|
||
cmd.extend(["--cover-size", cover_size])
|
||
|
||
if options.get("save_cover"):
|
||
cmd.append("--save-cover")
|
||
if options.get("save_playlist"):
|
||
cmd.append("--save-playlist")
|
||
if options.get("overwrite"):
|
||
cmd.append("--overwrite")
|
||
if options.get("download_music_videos"):
|
||
cmd.append("--download-music-videos")
|
||
if options.get("save_lrc"):
|
||
cmd.append("--lrc-only")
|
||
if options.get("no_lrc"):
|
||
cmd.append("--no-lrc")
|
||
|
||
truncate = options.get("truncate")
|
||
if truncate:
|
||
cmd.extend(["--truncate", str(truncate)])
|
||
|
||
cmd.extend(urls)
|
||
|
||
want_mp3 = options.get("output_format") == "mp3"
|
||
files_before = snapshot_audio_files(user_dir) if want_mp3 else None
|
||
|
||
with jobs_lock:
|
||
jobs[job_id]["status"] = "running"
|
||
jobs[job_id]["command"] = " ".join(cmd)
|
||
|
||
try:
|
||
process = subprocess.Popen(
|
||
cmd,
|
||
stdout=subprocess.PIPE,
|
||
stderr=subprocess.STDOUT,
|
||
text=True,
|
||
bufsize=1,
|
||
)
|
||
with jobs_lock:
|
||
jobs[job_id]["process"] = process
|
||
|
||
output_lines = []
|
||
for line in process.stdout:
|
||
with jobs_lock:
|
||
if jobs[job_id]["status"] == "cancelled":
|
||
break
|
||
line = line.rstrip("\n")
|
||
output_lines.append(line)
|
||
with jobs_lock:
|
||
jobs[job_id]["output"] = output_lines[-500:]
|
||
|
||
process.wait()
|
||
|
||
with jobs_lock:
|
||
cancelled = jobs[job_id]["status"] == "cancelled"
|
||
|
||
if cancelled:
|
||
with jobs_lock:
|
||
jobs[job_id]["output"] = jobs[job_id].get("output", []) + ["[cancelled] Job was cancelled by user."]
|
||
else:
|
||
if process.returncode == 0:
|
||
target = Path(effective_output)
|
||
post_process_votify_files(target, job_id, user_dir)
|
||
if want_mp3:
|
||
convert_to_mp3(job_id, files_before, user_dir)
|
||
with jobs_lock:
|
||
jobs[job_id]["status"] = "completed" if process.returncode == 0 else "failed"
|
||
jobs[job_id]["return_code"] = process.returncode
|
||
except Exception as e:
|
||
with jobs_lock:
|
||
jobs[job_id]["status"] = "failed"
|
||
jobs[job_id]["output"] = jobs[job_id].get("output", []) + [str(e)]
|
||
|
||
with jobs_lock:
|
||
database.upsert_job(jobs[job_id])
|
||
|
||
|
||
def run_monochrome_download(job_id: str, url: str, quality: str, user_id: str):
|
||
user_dir = get_user_downloads_dir(user_id)
|
||
|
||
with jobs_lock:
|
||
jobs[job_id]["status"] = "running"
|
||
|
||
def log(msg):
|
||
with jobs_lock:
|
||
jobs[job_id]["output"] = jobs[job_id].get("output", [])[-500:] + [msg]
|
||
|
||
def is_cancelled():
|
||
with jobs_lock:
|
||
return jobs[job_id]["status"] == "cancelled"
|
||
|
||
try:
|
||
from monochrome.api import download_spotify_url
|
||
success, total, fail_info = download_spotify_url(
|
||
spotify_url=url,
|
||
quality=quality,
|
||
output_dir=str(user_dir),
|
||
log=log,
|
||
cancel_check=is_cancelled,
|
||
)
|
||
with jobs_lock:
|
||
if jobs[job_id]["status"] != "cancelled":
|
||
jobs[job_id]["status"] = "completed" if success > 0 else "failed"
|
||
jobs[job_id]["return_code"] = 0 if success > 0 else 1
|
||
except Exception as e:
|
||
with jobs_lock:
|
||
jobs[job_id]["status"] = "failed"
|
||
jobs[job_id]["output"] = jobs[job_id].get("output", []) + [f"[error] {e}"]
|
||
jobs[job_id]["return_code"] = 1
|
||
|
||
with jobs_lock:
|
||
database.upsert_job(jobs[job_id])
|
||
|
||
|
||
def run_unified_download(job_id: str, url: str, user_id: str):
|
||
user_dir = get_user_downloads_dir(user_id)
|
||
|
||
with jobs_lock:
|
||
jobs[job_id]["status"] = "running"
|
||
|
||
def log(msg):
|
||
with jobs_lock:
|
||
jobs[job_id]["output"] = jobs[job_id].get("output", [])[-500:] + [msg]
|
||
|
||
def is_cancelled():
|
||
with jobs_lock:
|
||
return jobs[job_id]["status"] == "cancelled"
|
||
|
||
try:
|
||
from monochrome.api import download_spotify_url
|
||
success, total, fail_info = download_spotify_url(
|
||
spotify_url=url,
|
||
quality="MP3_320",
|
||
output_dir=str(user_dir),
|
||
log=log,
|
||
cancel_check=is_cancelled,
|
||
)
|
||
with jobs_lock:
|
||
if jobs[job_id]["status"] != "cancelled":
|
||
jobs[job_id]["status"] = "completed" if success > 0 else "failed"
|
||
jobs[job_id]["return_code"] = 0 if success > 0 else 1
|
||
except Exception as e:
|
||
with jobs_lock:
|
||
jobs[job_id]["status"] = "failed"
|
||
jobs[job_id]["output"] = jobs[job_id].get("output", []) + [f"[error] {e}"]
|
||
jobs[job_id]["return_code"] = 1
|
||
with jobs_lock:
|
||
database.upsert_job(jobs[job_id])
|
||
return
|
||
|
||
with jobs_lock:
|
||
cancelled = jobs[job_id]["status"] == "cancelled"
|
||
|
||
failed_urls = fail_info.get("failed_urls", [])
|
||
if cancelled or not failed_urls:
|
||
with jobs_lock:
|
||
database.upsert_job(jobs[job_id])
|
||
return
|
||
|
||
# Spawn Votify fallback job for failed tracks
|
||
fallback_quality = database.get_setting("fallback_quality", "aac-medium")
|
||
|
||
subfolder = fail_info.get("subfolder")
|
||
output_path = str(user_dir / subfolder) if subfolder else str(user_dir)
|
||
|
||
votify_job_id = str(uuid.uuid4())[:8]
|
||
with jobs_lock:
|
||
jobs[votify_job_id] = {
|
||
"id": votify_job_id,
|
||
"user_id": user_id,
|
||
"urls": failed_urls,
|
||
"options": {"audio_quality": fallback_quality, "source": "votify-fallback"},
|
||
"status": "queued",
|
||
"output": [],
|
||
"created_at": time.time(),
|
||
}
|
||
|
||
log(f"[monochrome] {len(failed_urls)} track(s) failed — starting Votify fallback (job {votify_job_id})")
|
||
|
||
with jobs_lock:
|
||
database.upsert_job(jobs[job_id])
|
||
|
||
run_download(votify_job_id, failed_urls, {
|
||
"audio_quality": fallback_quality,
|
||
"output_format": "mp3",
|
||
}, output_path=output_path, user_id=user_id)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Background purge thread
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def _purge_loop():
|
||
while True:
|
||
time.sleep(3600)
|
||
try:
|
||
days = int(database.get_setting("job_expiry_days", "30"))
|
||
if days <= 0:
|
||
continue
|
||
cutoff = time.time() - days * 86400
|
||
database.delete_jobs_older_than(cutoff)
|
||
if not DOWNLOADS_DIR.exists():
|
||
continue
|
||
for user_dir in DOWNLOADS_DIR.iterdir():
|
||
if not user_dir.is_dir():
|
||
continue
|
||
files = [f for f in user_dir.rglob("*") if f.is_file()]
|
||
if not files or max(f.stat().st_mtime for f in files) < cutoff:
|
||
shutil.rmtree(user_dir, ignore_errors=True)
|
||
except Exception as e:
|
||
print(f"[purge] {e}")
|
||
|
||
|
||
threading.Thread(target=_purge_loop, daemon=True).start()
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Static / offline routes
|
||
# ---------------------------------------------------------------------------
|
||
|
||
@app.route("/sw.js")
|
||
def service_worker():
|
||
return send_from_directory("static", "sw.js", mimetype="application/javascript")
|
||
|
||
|
||
@app.route("/offline")
|
||
def offline():
|
||
return send_from_directory("static", "offline.html")
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Auth routes
|
||
# ---------------------------------------------------------------------------
|
||
|
||
@app.route("/login", methods=["GET", "POST"])
|
||
def login():
|
||
if request.method == "POST":
|
||
username = request.form.get("username", "").strip()
|
||
password = request.form.get("password", "")
|
||
user = database.get_user_by_username(username)
|
||
if user and database.verify_password(user, password):
|
||
session.clear()
|
||
session["user_id"] = user["id"]
|
||
session["username"] = user["username"]
|
||
session["role"] = user["role"]
|
||
database.update_last_login(user["id"])
|
||
return redirect(url_for("index"))
|
||
return render_template("login.html", error="Invalid username or password")
|
||
return render_template("login.html", error=None)
|
||
|
||
|
||
@app.route("/logout")
|
||
def logout():
|
||
session.clear()
|
||
return redirect(url_for("login"))
|
||
|
||
|
||
@app.route("/")
|
||
def index():
|
||
return render_template(
|
||
"index.html",
|
||
username=session.get("username"),
|
||
role=session.get("role"),
|
||
)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Account routes
|
||
# ---------------------------------------------------------------------------
|
||
|
||
@app.route("/api/account/password", methods=["POST"])
|
||
def change_password():
|
||
data = request.json
|
||
user = database.get_user_by_id(session["user_id"])
|
||
if not database.verify_password(user, data.get("current_password", "")):
|
||
return jsonify({"error": "Current password incorrect"}), 400
|
||
new_pw = data.get("new_password", "")
|
||
if not new_pw:
|
||
return jsonify({"error": "New password cannot be empty"}), 400
|
||
database.update_user_password(session["user_id"], new_pw)
|
||
return jsonify({"ok": True})
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Download routes
|
||
# ---------------------------------------------------------------------------
|
||
|
||
@app.route("/api/download", methods=["POST"])
|
||
def start_download():
|
||
data = request.json
|
||
urls = [u.strip() for u in data.get("urls", "").split("\n") if u.strip()]
|
||
if not urls:
|
||
return jsonify({"error": "No URLs provided"}), 400
|
||
if not COOKIES_PATH.exists():
|
||
return jsonify({"error": "cookies.txt not found. Mount it to /config/cookies.txt"}), 400
|
||
|
||
options = {
|
||
"audio_quality": data.get("audio_quality", "aac-medium"),
|
||
"download_mode": data.get("download_mode", "ytdlp"),
|
||
"video_format": data.get("video_format", "mp4"),
|
||
"cover_size": data.get("cover_size", "large"),
|
||
"save_cover": data.get("save_cover", False),
|
||
"save_playlist": data.get("save_playlist", False),
|
||
"overwrite": data.get("overwrite", False),
|
||
"download_music_videos": data.get("download_music_videos", False),
|
||
"save_lrc": data.get("save_lrc", False),
|
||
"no_lrc": data.get("no_lrc", False),
|
||
"truncate": data.get("truncate"),
|
||
"output_format": data.get("output_format", "original"),
|
||
}
|
||
|
||
user_id = session["user_id"]
|
||
job_id = str(uuid.uuid4())[:8]
|
||
with jobs_lock:
|
||
jobs[job_id] = {
|
||
"id": job_id,
|
||
"user_id": user_id,
|
||
"urls": urls,
|
||
"options": options,
|
||
"status": "queued",
|
||
"output": [],
|
||
"created_at": time.time(),
|
||
}
|
||
|
||
thread = threading.Thread(
|
||
target=run_download, args=(job_id, urls, options, None, user_id), daemon=True
|
||
)
|
||
thread.start()
|
||
return jsonify({"job_id": job_id})
|
||
|
||
|
||
@app.route("/api/monochrome/download", methods=["POST"])
|
||
def start_monochrome_download():
|
||
data = request.json
|
||
url = data.get("url", "").strip()
|
||
quality = data.get("quality", "HI_RES_LOSSLESS")
|
||
if not url:
|
||
return jsonify({"error": "No URL provided"}), 400
|
||
|
||
valid_qualities = ["HI_RES_LOSSLESS", "LOSSLESS", "HIGH", "LOW", "MP3_320"]
|
||
if quality not in valid_qualities:
|
||
return jsonify({"error": f"Invalid quality. Choose from: {valid_qualities}"}), 400
|
||
|
||
user_id = session["user_id"]
|
||
job_id = str(uuid.uuid4())[:8]
|
||
with jobs_lock:
|
||
jobs[job_id] = {
|
||
"id": job_id,
|
||
"user_id": user_id,
|
||
"urls": [url],
|
||
"options": {"quality": quality, "source": "monochrome"},
|
||
"status": "queued",
|
||
"output": [],
|
||
"created_at": time.time(),
|
||
}
|
||
|
||
thread = threading.Thread(
|
||
target=run_monochrome_download, args=(job_id, url, quality, user_id), daemon=True
|
||
)
|
||
thread.start()
|
||
return jsonify({"job_id": job_id})
|
||
|
||
|
||
@app.route("/api/unified/download", methods=["POST"])
|
||
def start_unified_download():
|
||
data = request.json
|
||
url = data.get("url", "").strip()
|
||
if not url:
|
||
return jsonify({"error": "No URL provided"}), 400
|
||
|
||
user_id = session["user_id"]
|
||
job_id = str(uuid.uuid4())[:8]
|
||
with jobs_lock:
|
||
jobs[job_id] = {
|
||
"id": job_id,
|
||
"user_id": user_id,
|
||
"urls": [url],
|
||
"options": {"source": "unified"},
|
||
"status": "queued",
|
||
"output": [],
|
||
"created_at": time.time(),
|
||
}
|
||
|
||
thread = threading.Thread(
|
||
target=run_unified_download, args=(job_id, url, user_id), daemon=True
|
||
)
|
||
thread.start()
|
||
return jsonify({"job_id": job_id})
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Settings routes
|
||
# ---------------------------------------------------------------------------
|
||
|
||
@app.route("/api/settings", methods=["GET"])
|
||
def get_settings():
|
||
return jsonify({
|
||
"fallback_quality": database.get_setting("fallback_quality", "aac-medium"),
|
||
"job_expiry_days": int(database.get_setting("job_expiry_days", "30")),
|
||
})
|
||
|
||
|
||
@app.route("/api/settings", methods=["POST"])
|
||
def save_settings():
|
||
guard = require_admin()
|
||
if guard:
|
||
return guard
|
||
data = request.json
|
||
allowed_qualities = ["aac-medium", "aac-high"]
|
||
quality = data.get("fallback_quality", "aac-medium")
|
||
if quality not in allowed_qualities:
|
||
quality = "aac-medium"
|
||
database.set_setting("fallback_quality", quality)
|
||
if "job_expiry_days" in data:
|
||
try:
|
||
days = max(0, int(data["job_expiry_days"]))
|
||
except (ValueError, TypeError):
|
||
days = 30
|
||
database.set_setting("job_expiry_days", str(days))
|
||
return jsonify({"ok": True})
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Artwork routes
|
||
# ---------------------------------------------------------------------------
|
||
|
||
@app.route("/api/artwork", methods=["GET"])
|
||
def get_artwork():
|
||
from monochrome.spotify_to_ids import parse_spotify_url, fetch_spotify_embed
|
||
spotify_url = request.args.get("url", "").strip()
|
||
if not spotify_url:
|
||
return jsonify({"error": "No URL provided"}), 400
|
||
|
||
sp_type, sp_id = parse_spotify_url(spotify_url)
|
||
if not sp_type:
|
||
return jsonify({"error": "Invalid Spotify URL"}), 400
|
||
|
||
embed_data = fetch_spotify_embed(sp_type, sp_id)
|
||
if not embed_data:
|
||
return jsonify({"error": "Could not fetch Spotify metadata"}), 502
|
||
|
||
try:
|
||
entity = embed_data["props"]["pageProps"]["state"]["data"]["entity"]
|
||
except (KeyError, TypeError):
|
||
return jsonify({"error": "Unexpected Spotify response format"}), 502
|
||
|
||
# entity.visualIdentity.image[] — confirmed structure from Spotify embed page
|
||
# Each entry: {"url": "https://image-cdn-ak.spotifycdn.com/image/...", "maxWidth": N, "maxHeight": N}
|
||
images = (entity.get("visualIdentity") or {}).get("image", [])
|
||
if not images:
|
||
app.logger.warning("Artwork not found. Entity keys: %s", list(entity.keys()))
|
||
return jsonify({"error": "No artwork found for this URL"}), 404
|
||
|
||
best = max(images, key=lambda img: img.get("maxWidth", 0))
|
||
url = best["url"]
|
||
|
||
# Upscale to 2000×2000 using the same CDN key technique as votify-fix.
|
||
# Spotify CDN filenames: first 16 hex chars = size key, remainder = image hash.
|
||
# Source: GladistonXD/votify-fix constants.py COVER_SIZE_X_KEY_MAPPING_SONG
|
||
EXTRA_LARGE_KEY = "ab67616d000082c1"
|
||
parts = url.rsplit("/", 1)
|
||
if len(parts) == 2 and len(parts[1]) >= 16:
|
||
url = parts[0] + "/" + EXTRA_LARGE_KEY + parts[1][16:]
|
||
|
||
return jsonify({"image_url": url})
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Job routes
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def job_to_dict(job):
|
||
return {k: v for k, v in job.items() if k != "process"}
|
||
|
||
|
||
@app.route("/api/jobs")
|
||
def list_jobs():
|
||
user_id = session["user_id"]
|
||
db_jobs = {j["id"]: j for j in database.list_jobs_for_user(user_id)}
|
||
with jobs_lock:
|
||
live = {jid: job_to_dict(j) for jid, j in jobs.items()
|
||
if j.get("user_id") == user_id}
|
||
db_jobs.update(live)
|
||
return jsonify(sorted(db_jobs.values(), key=lambda j: j["created_at"], reverse=True))
|
||
|
||
|
||
@app.route("/api/jobs/<job_id>")
|
||
def get_job(job_id):
|
||
user_id = session["user_id"]
|
||
with jobs_lock:
|
||
job = jobs.get(job_id)
|
||
if job and job.get("user_id") == user_id:
|
||
return jsonify(job_to_dict(job))
|
||
job = database.get_job(job_id)
|
||
if not job or job["user_id"] != user_id:
|
||
return jsonify({"error": "Job not found"}), 404
|
||
return jsonify(job)
|
||
|
||
|
||
@app.route("/api/jobs/<job_id>/cancel", methods=["POST"])
|
||
def cancel_job(job_id):
|
||
user_id = session["user_id"]
|
||
with jobs_lock:
|
||
job = jobs.get(job_id)
|
||
if not job or job.get("user_id") != user_id:
|
||
return jsonify({"error": "Job not found"}), 404
|
||
if job["status"] != "running":
|
||
return jsonify({"error": "Job is not running"}), 400
|
||
job["status"] = "cancelled"
|
||
proc = job.get("process")
|
||
if proc:
|
||
proc.terminate()
|
||
return jsonify({"ok": True})
|
||
|
||
|
||
@app.route("/api/jobs/<job_id>", methods=["DELETE"])
|
||
def delete_job(job_id):
|
||
user_id = session["user_id"]
|
||
with jobs_lock:
|
||
job = jobs.get(job_id)
|
||
if job and job.get("user_id") == user_id:
|
||
if job["status"] == "running":
|
||
return jsonify({"error": "Cancel the job first"}), 400
|
||
del jobs[job_id]
|
||
database.delete_job(job_id)
|
||
return jsonify({"ok": True})
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# File routes (scoped to current user)
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def _resolve_user_path(user_id: str, rel_path: str):
|
||
"""Returns (user_dir, target) or raises ValueError on traversal."""
|
||
user_dir = get_user_downloads_dir(user_id)
|
||
target = (user_dir / rel_path).resolve()
|
||
if not str(target).startswith(str(user_dir.resolve())):
|
||
raise ValueError("Invalid path")
|
||
return user_dir, target
|
||
|
||
|
||
@app.route("/api/files")
|
||
def list_files():
|
||
user_id = session["user_id"]
|
||
rel_path = request.args.get("path", "")
|
||
try:
|
||
user_dir, target = _resolve_user_path(user_id, rel_path)
|
||
except ValueError:
|
||
return jsonify({"error": "Invalid path"}), 403
|
||
|
||
if not target.exists():
|
||
return jsonify([])
|
||
|
||
items = []
|
||
try:
|
||
for entry in sorted(target.iterdir(), key=lambda e: (not e.is_dir(), e.name.lower())):
|
||
rel = entry.relative_to(user_dir)
|
||
items.append({
|
||
"name": entry.name,
|
||
"path": str(rel).replace("\\", "/"),
|
||
"is_dir": entry.is_dir(),
|
||
"size": entry.stat().st_size if entry.is_file() else None,
|
||
})
|
||
except PermissionError:
|
||
pass
|
||
return jsonify(items)
|
||
|
||
|
||
@app.route("/api/files/download")
|
||
def download_file():
|
||
user_id = session["user_id"]
|
||
rel_path = request.args.get("path", "")
|
||
try:
|
||
_, target = _resolve_user_path(user_id, rel_path)
|
||
except ValueError:
|
||
return jsonify({"error": "Invalid path"}), 403
|
||
if not target.is_file():
|
||
return jsonify({"error": "File not found"}), 404
|
||
return send_from_directory(target.parent, target.name, as_attachment=True)
|
||
|
||
|
||
@app.route("/api/files/download-folder")
|
||
def download_folder():
|
||
user_id = session["user_id"]
|
||
rel_path = request.args.get("path", "")
|
||
try:
|
||
_, target = _resolve_user_path(user_id, rel_path)
|
||
except ValueError:
|
||
return jsonify({"error": "Invalid path"}), 403
|
||
if not target.is_dir():
|
||
return jsonify({"error": "Folder not found"}), 404
|
||
|
||
buf = io.BytesIO()
|
||
with zipfile.ZipFile(buf, "w", zipfile.ZIP_DEFLATED) as zf:
|
||
for file in target.rglob("*"):
|
||
if file.is_file():
|
||
zf.write(file, file.relative_to(target))
|
||
buf.seek(0)
|
||
|
||
folder_name = target.name or "downloads"
|
||
return app.response_class(
|
||
buf.getvalue(),
|
||
mimetype="application/zip",
|
||
headers={"Content-Disposition": f'attachment; filename="{folder_name}.zip"'},
|
||
)
|
||
|
||
|
||
@app.route("/api/files/delete", methods=["DELETE"])
|
||
def delete_path():
|
||
user_id = session["user_id"]
|
||
rel_path = request.args.get("path", "")
|
||
if not rel_path:
|
||
return jsonify({"error": "Cannot delete root"}), 400
|
||
try:
|
||
user_dir, target = _resolve_user_path(user_id, rel_path)
|
||
except ValueError:
|
||
return jsonify({"error": "Invalid path"}), 403
|
||
if not target.exists():
|
||
return jsonify({"error": "Not found"}), 404
|
||
try:
|
||
if target.is_dir():
|
||
shutil.rmtree(target)
|
||
else:
|
||
target.unlink()
|
||
except Exception as e:
|
||
return jsonify({"error": str(e)}), 500
|
||
return jsonify({"ok": True})
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Cookies / WVD routes
|
||
# ---------------------------------------------------------------------------
|
||
|
||
@app.route("/api/cookies", methods=["GET"])
|
||
def check_cookies():
|
||
guard = require_admin()
|
||
if guard:
|
||
return guard
|
||
return jsonify({"exists": COOKIES_PATH.exists()})
|
||
|
||
|
||
@app.route("/api/cookies", methods=["POST"])
|
||
def upload_cookies():
|
||
guard = require_admin()
|
||
if guard:
|
||
return guard
|
||
if "file" not in request.files:
|
||
return jsonify({"error": "No file uploaded"}), 400
|
||
CONFIG_DIR.mkdir(parents=True, exist_ok=True)
|
||
request.files["file"].save(COOKIES_PATH)
|
||
return jsonify({"ok": True})
|
||
|
||
|
||
@app.route("/api/wvd", methods=["GET"])
|
||
def check_wvd():
|
||
guard = require_admin()
|
||
if guard:
|
||
return guard
|
||
return jsonify({"exists": WVD_PATH.exists()})
|
||
|
||
|
||
@app.route("/api/wvd", methods=["POST"])
|
||
def upload_wvd():
|
||
guard = require_admin()
|
||
if guard:
|
||
return guard
|
||
if "file" not in request.files:
|
||
return jsonify({"error": "No file uploaded"}), 400
|
||
CONFIG_DIR.mkdir(parents=True, exist_ok=True)
|
||
request.files["file"].save(WVD_PATH)
|
||
return jsonify({"ok": True})
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Admin routes
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def _admin_resolve_path(user_id: str, rel_path: str):
|
||
user_dir = get_user_downloads_dir(user_id)
|
||
target = (user_dir / rel_path).resolve()
|
||
if not str(target).startswith(str(user_dir.resolve())):
|
||
raise ValueError("Invalid path")
|
||
return user_dir, target
|
||
|
||
|
||
@app.route("/api/admin/users", methods=["GET"])
|
||
def admin_list_users():
|
||
guard = require_admin()
|
||
if guard:
|
||
return guard
|
||
return jsonify(database.list_users())
|
||
|
||
|
||
@app.route("/api/admin/users", methods=["POST"])
|
||
def admin_create_user():
|
||
guard = require_admin()
|
||
if guard:
|
||
return guard
|
||
data = request.json
|
||
username = (data.get("username") or "").strip()
|
||
password = data.get("password") or ""
|
||
role = data.get("role", "user")
|
||
if not username or not password:
|
||
return jsonify({"error": "username and password required"}), 400
|
||
if role not in ("admin", "user"):
|
||
role = "user"
|
||
try:
|
||
user = database.create_user(username, password, role)
|
||
except ValueError as e:
|
||
return jsonify({"error": str(e)}), 409
|
||
return jsonify({"ok": True, "user": {k: user[k] for k in ("id", "username", "role")}})
|
||
|
||
|
||
@app.route("/api/admin/users/<user_id>", methods=["DELETE"])
|
||
def admin_delete_user(user_id):
|
||
guard = require_admin()
|
||
if guard:
|
||
return guard
|
||
if user_id == session["user_id"]:
|
||
return jsonify({"error": "Cannot delete your own account"}), 400
|
||
# Cancel and evict any live in-memory jobs belonging to this user
|
||
with jobs_lock:
|
||
for job in list(jobs.values()):
|
||
if job.get("user_id") == user_id:
|
||
proc = job.get("process")
|
||
if proc:
|
||
try:
|
||
proc.terminate()
|
||
except Exception:
|
||
pass
|
||
del jobs[job["id"]]
|
||
database.delete_user(user_id) # cascades to jobs table
|
||
user_dir = DOWNLOADS_DIR / user_id
|
||
if user_dir.exists():
|
||
shutil.rmtree(user_dir, ignore_errors=True)
|
||
return jsonify({"ok": True})
|
||
|
||
|
||
@app.route("/api/admin/users/<user_id>/password", methods=["POST"])
|
||
def admin_reset_user_password(user_id):
|
||
guard = require_admin()
|
||
if guard:
|
||
return guard
|
||
data = request.json
|
||
new_password = (data or {}).get("new_password", "")
|
||
if not new_password:
|
||
return jsonify({"error": "Password cannot be empty"}), 400
|
||
if not database.get_user_by_id(user_id):
|
||
return jsonify({"error": "User not found"}), 404
|
||
database.update_user_password(user_id, new_password)
|
||
return jsonify({"ok": True})
|
||
|
||
|
||
@app.route("/api/admin/users/<user_id>/jobs", methods=["GET"])
|
||
def admin_list_user_jobs(user_id):
|
||
guard = require_admin()
|
||
if guard:
|
||
return guard
|
||
db_jobs = {j["id"]: j for j in database.list_jobs_for_user(user_id)}
|
||
with jobs_lock:
|
||
live = {jid: job_to_dict(j) for jid, j in jobs.items()
|
||
if j.get("user_id") == user_id}
|
||
db_jobs.update(live)
|
||
return jsonify(sorted(db_jobs.values(), key=lambda j: j["created_at"], reverse=True))
|
||
|
||
|
||
@app.route("/api/admin/jobs/<job_id>", methods=["DELETE"])
|
||
def admin_delete_job(job_id):
|
||
guard = require_admin()
|
||
if guard:
|
||
return guard
|
||
with jobs_lock:
|
||
if job_id in jobs:
|
||
if jobs[job_id]["status"] == "running":
|
||
return jsonify({"error": "Cancel the job first"}), 400
|
||
del jobs[job_id]
|
||
database.delete_job(job_id)
|
||
return jsonify({"ok": True})
|
||
|
||
|
||
@app.route("/api/admin/files")
|
||
def admin_list_files():
|
||
guard = require_admin()
|
||
if guard:
|
||
return guard
|
||
user_id = request.args.get("user_id", "")
|
||
rel_path = request.args.get("path", "")
|
||
if not user_id:
|
||
return jsonify({"error": "user_id required"}), 400
|
||
try:
|
||
user_dir, target = _admin_resolve_path(user_id, rel_path)
|
||
except ValueError:
|
||
return jsonify({"error": "Invalid path"}), 403
|
||
|
||
if not target.exists():
|
||
return jsonify([])
|
||
items = []
|
||
try:
|
||
for entry in sorted(target.iterdir(), key=lambda e: (not e.is_dir(), e.name.lower())):
|
||
rel = entry.relative_to(user_dir)
|
||
items.append({
|
||
"name": entry.name,
|
||
"path": str(rel).replace("\\", "/"),
|
||
"is_dir": entry.is_dir(),
|
||
"size": entry.stat().st_size if entry.is_file() else None,
|
||
})
|
||
except PermissionError:
|
||
pass
|
||
return jsonify(items)
|
||
|
||
|
||
@app.route("/api/admin/files/download")
|
||
def admin_download_file():
|
||
guard = require_admin()
|
||
if guard:
|
||
return guard
|
||
user_id = request.args.get("user_id", "")
|
||
rel_path = request.args.get("path", "")
|
||
try:
|
||
_, target = _admin_resolve_path(user_id, rel_path)
|
||
except ValueError:
|
||
return jsonify({"error": "Invalid path"}), 403
|
||
if not target.is_file():
|
||
return jsonify({"error": "File not found"}), 404
|
||
return send_from_directory(target.parent, target.name, as_attachment=True)
|
||
|
||
|
||
@app.route("/api/admin/files/download-folder")
|
||
def admin_download_folder():
|
||
guard = require_admin()
|
||
if guard:
|
||
return guard
|
||
user_id = request.args.get("user_id", "")
|
||
rel_path = request.args.get("path", "")
|
||
try:
|
||
_, target = _admin_resolve_path(user_id, rel_path)
|
||
except ValueError:
|
||
return jsonify({"error": "Invalid path"}), 403
|
||
if not target.is_dir():
|
||
return jsonify({"error": "Folder not found"}), 404
|
||
|
||
buf = io.BytesIO()
|
||
with zipfile.ZipFile(buf, "w", zipfile.ZIP_DEFLATED) as zf:
|
||
for file in target.rglob("*"):
|
||
if file.is_file():
|
||
zf.write(file, file.relative_to(target))
|
||
buf.seek(0)
|
||
folder_name = target.name or "downloads"
|
||
return app.response_class(
|
||
buf.getvalue(),
|
||
mimetype="application/zip",
|
||
headers={"Content-Disposition": f'attachment; filename="{folder_name}.zip"'},
|
||
)
|
||
|
||
|
||
@app.route("/api/admin/files/delete", methods=["DELETE"])
|
||
def admin_delete_file():
|
||
guard = require_admin()
|
||
if guard:
|
||
return guard
|
||
user_id = request.args.get("user_id", "")
|
||
rel_path = request.args.get("path", "")
|
||
if not rel_path:
|
||
return jsonify({"error": "Cannot delete root"}), 400
|
||
try:
|
||
_, target = _admin_resolve_path(user_id, rel_path)
|
||
except ValueError:
|
||
return jsonify({"error": "Invalid path"}), 403
|
||
if not target.exists():
|
||
return jsonify({"error": "Not found"}), 404
|
||
try:
|
||
if target.is_dir():
|
||
shutil.rmtree(target)
|
||
else:
|
||
target.unlink()
|
||
except Exception as e:
|
||
return jsonify({"error": str(e)}), 500
|
||
return jsonify({"ok": True})
|
||
|
||
|
||
if __name__ == "__main__":
|
||
app.run(host="0.0.0.0", port=5000, debug=False)
|