diff --git a/.gitignore b/.gitignore index c3b3c4d..2f7d50e 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ /config/ /downloads/ .env +__pycache__/ diff --git a/Dockerfile b/Dockerfile index 8bc7849..5d0b711 100644 --- a/Dockerfile +++ b/Dockerfile @@ -23,6 +23,7 @@ COPY requirements.txt /app/requirements.txt RUN pip install --no-cache-dir -r /app/requirements.txt COPY app.py /app/app.py +COPY utils.py /app/utils.py COPY templates /app/templates COPY static /app/static COPY monochrome /app/monochrome diff --git a/app.py b/app.py index c3084f5..2532f98 100644 --- a/app.py +++ b/app.py @@ -1,4 +1,5 @@ import io +import json import os import shutil import subprocess @@ -8,6 +9,8 @@ import uuid import zipfile from pathlib import Path +from utils import rename_from_metadata, sanitize_filename, cleanup_empty_dirs + from flask import ( Flask, jsonify, @@ -88,15 +91,67 @@ def convert_to_mp3(job_id: str, before: set[Path]): log("[mp3] Conversion complete.") -def run_download(job_id: str, urls: list[str], options: dict): +AUDIO_EXTS = {".m4a", ".ogg", ".opus", ".mp3", ".flac"} + + +def post_process_votify_files(target_dir, job_id): + """Flatten subdirs, rename files from metadata, wrap single tracks.""" + target_dir = Path(target_dir) + + def log(msg): + with jobs_lock: + jobs[job_id]["output"] = jobs[job_id].get("output", [])[-500:] + [msg] + + # 1. Move any files in subdirectories up to target_dir + for ext in AUDIO_EXTS: + for f in list(target_dir.rglob(f"*{ext}")): + if f.parent != target_dir: + dest = target_dir / f.name + counter = 2 + while dest.exists(): + dest = target_dir / f"{f.stem} ({counter}){f.suffix}" + counter += 1 + shutil.move(str(f), str(dest)) + + # 2. Rename from metadata + renamed = [] + for ext in AUDIO_EXTS: + for f in list(target_dir.glob(f"*{ext}")): + new_path = rename_from_metadata(f) + renamed.append(new_path) + if new_path != f: + log(f"[post] Renamed: {new_path.name}") + + # 3. Clean up empty subdirs + cleanup_empty_dirs(target_dir) + + # 4. Single-track wrapping (only if downloading to root downloads dir) + if len(renamed) == 1 and target_dir == DOWNLOADS_DIR: + f = renamed[0] + folder_name = sanitize_filename(f.stem) + wrapper = target_dir / folder_name + wrapper.mkdir(exist_ok=True) + shutil.move(str(f), str(wrapper / f.name)) + log(f"[post] Wrapped single track in folder: {folder_name}") + + return renamed + + +def run_download(job_id: str, urls: list[str], options: dict, output_path: str = None): cmd = ["votify"] cmd.extend(["--cookies-path", str(COOKIES_PATH)]) - cmd.extend(["--output-path", str(DOWNLOADS_DIR)]) + cmd.extend(["--output-path", output_path or str(DOWNLOADS_DIR)]) cmd.extend(["--temp-path", str(TEMP_DIR)]) if WVD_PATH.exists(): cmd.extend(["--wvd-path", str(WVD_PATH)]) + # Flatten folder structure — no nested Artist/Album subdirectories + cmd.extend(["--template-folder-album", "."]) + cmd.extend(["--template-folder-compilation", "."]) + cmd.extend(["--template-folder-episode", "."]) + cmd.extend(["--template-folder-music-video", "."]) + quality = options.get("audio_quality", "aac-medium") if quality: cmd.extend(["--audio-quality", quality]) @@ -174,8 +229,11 @@ def run_download(job_id: str, urls: list[str], options: dict): with jobs_lock: jobs[job_id]["output"] = jobs[job_id].get("output", []) + ["[cancelled] Job was cancelled by user."] else: - if process.returncode == 0 and want_mp3: - convert_to_mp3(job_id, files_before) + if process.returncode == 0: + target = Path(output_path) if output_path else DOWNLOADS_DIR + post_process_votify_files(target, job_id) + if want_mp3: + convert_to_mp3(job_id, files_before) with jobs_lock: jobs[job_id]["status"] = "completed" if process.returncode == 0 else "failed" jobs[job_id]["return_code"] = process.returncode @@ -273,7 +331,7 @@ def run_monochrome_download(job_id: str, url: str, quality: str): try: from monochrome.api import download_spotify_url - success, total = download_spotify_url( + success, total, fail_info = download_spotify_url( spotify_url=url, quality=quality, output_dir=str(DOWNLOADS_DIR), @@ -323,6 +381,131 @@ def start_monochrome_download(): return jsonify({"job_id": job_id}) +# --- Unified download (Monochrome → Votify fallback) --- + +SETTINGS_PATH = CONFIG_DIR / "settings.json" + + +def load_settings(): + if SETTINGS_PATH.exists(): + try: + return json.loads(SETTINGS_PATH.read_text()) + except (json.JSONDecodeError, OSError): + pass + return {"fallback_quality": "aac-medium"} + + +def run_unified_download(job_id: str, url: str): + with jobs_lock: + jobs[job_id]["status"] = "running" + + def log(msg): + with jobs_lock: + jobs[job_id]["output"] = jobs[job_id].get("output", [])[-500:] + [msg] + + def is_cancelled(): + with jobs_lock: + return jobs[job_id]["status"] == "cancelled" + + try: + from monochrome.api import download_spotify_url + success, total, fail_info = download_spotify_url( + spotify_url=url, + quality="MP3_320", + output_dir=str(DOWNLOADS_DIR), + log=log, + cancel_check=is_cancelled, + ) + with jobs_lock: + if jobs[job_id]["status"] != "cancelled": + jobs[job_id]["status"] = "completed" if success > 0 else "failed" + jobs[job_id]["return_code"] = 0 if success > 0 else 1 + except Exception as e: + with jobs_lock: + jobs[job_id]["status"] = "failed" + jobs[job_id]["output"] = jobs[job_id].get("output", []) + [f"[error] {e}"] + jobs[job_id]["return_code"] = 1 + return + + # Check if we should spawn Votify fallback + with jobs_lock: + cancelled = jobs[job_id]["status"] == "cancelled" + + failed_urls = fail_info.get("failed_urls", []) + if cancelled or not failed_urls: + return + + # Spawn Votify fallback job for failed tracks + settings = load_settings() + fallback_quality = settings.get("fallback_quality", "aac-medium") + + subfolder = fail_info.get("subfolder") + output_path = str(DOWNLOADS_DIR / subfolder) if subfolder else str(DOWNLOADS_DIR) + + votify_job_id = str(uuid.uuid4())[:8] + with jobs_lock: + jobs[votify_job_id] = { + "id": votify_job_id, + "urls": failed_urls, + "options": {"audio_quality": fallback_quality, "source": "votify-fallback"}, + "status": "queued", + "output": [], + "created_at": time.time(), + } + + log(f"[monochrome] {len(failed_urls)} track(s) failed — starting Votify fallback (job {votify_job_id})") + + run_download(votify_job_id, failed_urls, { + "audio_quality": fallback_quality, + "output_format": "mp3", + }, output_path=output_path) + + +@app.route("/api/unified/download", methods=["POST"]) +def start_unified_download(): + data = request.json + url = data.get("url", "").strip() + + if not url: + return jsonify({"error": "No URL provided"}), 400 + + job_id = str(uuid.uuid4())[:8] + with jobs_lock: + jobs[job_id] = { + "id": job_id, + "urls": [url], + "options": {"source": "unified"}, + "status": "queued", + "output": [], + "created_at": time.time(), + } + + thread = threading.Thread( + target=run_unified_download, args=(job_id, url), daemon=True + ) + thread.start() + + return jsonify({"job_id": job_id}) + + +@app.route("/api/settings", methods=["GET"]) +def get_settings(): + return jsonify(load_settings()) + + +@app.route("/api/settings", methods=["POST"]) +def save_settings(): + data = request.json + allowed_qualities = ["aac-medium", "aac-high"] + quality = data.get("fallback_quality", "aac-medium") + if quality not in allowed_qualities: + quality = "aac-medium" + settings = {"fallback_quality": quality} + CONFIG_DIR.mkdir(parents=True, exist_ok=True) + SETTINGS_PATH.write_text(json.dumps(settings)) + return jsonify({"ok": True}) + + def job_to_dict(job): return {k: v for k, v in job.items() if k != "process"} diff --git a/monochrome/api.py b/monochrome/api.py index ebe808f..bb473da 100644 --- a/monochrome/api.py +++ b/monochrome/api.py @@ -6,7 +6,9 @@ for use from app.py background threads. """ import os +import shutil import time +from pathlib import Path from monochrome import discover_instances from monochrome.spotify_to_ids import ( @@ -16,6 +18,8 @@ from monochrome.spotify_to_ids import ( extract_tracks, search_monochrome, find_best_match, + similarity, + normalize, ) from monochrome.download import ( get_stream_url_tidal, @@ -23,9 +27,59 @@ from monochrome.download import ( download_file, fetch_cover_art, embed_metadata, - sanitize_filename, convert_to_mp3, ) +from utils import sanitize_filename, rename_from_metadata + + +def verify_match(spotify_track, tidal_info, search_match, log): + """Cross-reference Spotify metadata against Tidal track data. + Falls back to search_match fields when tidal_info has empty values. + Returns True if the track is a valid match.""" + sp_title = spotify_track.get("title", "") + + # Prefer tidal_info title, fall back to search match title + ti_title = tidal_info.get("title", "") if tidal_info else "" + if not ti_title: + ti_title = search_match.get("title", "") if search_match else "" + if sp_title and ti_title: + title_sim = similarity(sp_title, ti_title) + if title_sim < 0.5: + log(f"[monochrome] Title mismatch: '{sp_title}' vs '{ti_title}' (sim={title_sim:.2f})") + return False + + sp_artist = spotify_track.get("artist", "") + + # Prefer tidal_info artist, fall back to search match artist + ti_artist = "" + for src in (tidal_info, search_match): + if not src: + continue + artist_obj = src.get("artist", {}) + ti_artist = artist_obj.get("name", "") if isinstance(artist_obj, dict) else str(artist_obj) + if ti_artist: + break + if sp_artist and ti_artist: + artist_sim = similarity(sp_artist, ti_artist) + # For multi-artist tracks, Tidal may only return the primary artist. + # Check if Tidal artist tokens are a subset of Spotify artist tokens. + sp_tokens = set(normalize(sp_artist).split()) + ti_tokens = set(normalize(ti_artist).split()) + is_subset = ti_tokens and ti_tokens.issubset(sp_tokens) + if artist_sim < 0.4 and not is_subset: + log(f"[monochrome] Artist mismatch: '{sp_artist}' vs '{ti_artist}' (sim={artist_sim:.2f})") + return False + + # Duration check (strongest signal) + sp_dur = spotify_track.get("duration") # milliseconds + ti_dur = (tidal_info or {}).get("duration") # seconds + if sp_dur and ti_dur: + sp_seconds = sp_dur / 1000 + if abs(sp_seconds - ti_dur) > 5: + log(f"[monochrome] Duration mismatch: {sp_seconds:.0f}s vs {ti_dur}s") + return False + + return True def download_spotify_url(spotify_url, quality, output_dir, log=None, cancel_check=None): @@ -39,7 +93,8 @@ def download_spotify_url(spotify_url, quality, output_dir, log=None, cancel_chec cancel_check: Callback () -> bool, returns True if cancelled Returns: - (success_count, total_tracks) + (success_count, total_tracks, fail_info) where fail_info is: + {"failed_urls": [...], "subfolder": "name" or None} """ if log is None: log = print @@ -54,10 +109,12 @@ def download_spotify_url(spotify_url, quality, output_dir, log=None, cancel_chec instances = discover_instances(log=log) # Step 2: Parse Spotify URL + fail_info = {"failed_urls": [], "subfolder": None} + sp_type, sp_id = parse_spotify_url(spotify_url) if not sp_type: log(f"[monochrome] Invalid Spotify URL: {spotify_url}") - return 0, 0 + return 0, 0, fail_info # Step 3: Fetch track list from Spotify log(f"[monochrome] Fetching Spotify {sp_type}: {sp_id}") @@ -66,25 +123,29 @@ def download_spotify_url(spotify_url, quality, output_dir, log=None, cancel_chec if not tracks: log(f"[monochrome] Could not extract tracks from {spotify_url}") - return 0, 0 + return 0, 0, fail_info total = len(tracks) log(f"[monochrome] Found {total} track(s) on Spotify") # Create subfolder for albums/playlists dl_dir = output_dir + subfolder_name = None if total > 1: collection_name = extract_collection_name(embed_data, sp_type) if collection_name: - folder_name = sanitize_filename(collection_name) + subfolder_name = sanitize_filename(collection_name) else: - folder_name = sanitize_filename(f"{sp_type}_{sp_id}") - dl_dir = os.path.join(output_dir, folder_name) + subfolder_name = sanitize_filename(f"{sp_type}_{sp_id}") + dl_dir = os.path.join(output_dir, subfolder_name) os.makedirs(dl_dir, exist_ok=True) - log(f"[monochrome] Saving to folder: {folder_name}") + log(f"[monochrome] Saving to folder: {subfolder_name}") + fail_info["subfolder"] = subfolder_name success = 0 failed_tracks = [] + failed_urls = [] + last_final_path = None for i, track in enumerate(tracks): if cancel_check(): @@ -101,6 +162,9 @@ def download_spotify_url(spotify_url, quality, output_dir, log=None, cancel_chec if not match: log(f"[monochrome] No match found for: {query}") failed_tracks.append(query) + sp_id = track.get("sp_id") + if sp_id: + failed_urls.append(f"https://open.spotify.com/track/{sp_id}") if i < total - 1: time.sleep(0.5) continue @@ -121,6 +185,9 @@ def download_spotify_url(spotify_url, quality, output_dir, log=None, cancel_chec if not stream_url: log(f"[monochrome] Failed to get stream for: {query}") failed_tracks.append(query) + sp_id = track.get("sp_id") + if sp_id: + failed_urls.append(f"https://open.spotify.com/track/{sp_id}") if i < total - 1: time.sleep(0.5) continue @@ -133,7 +200,17 @@ def download_spotify_url(spotify_url, quality, output_dir, log=None, cancel_chec if k not in info or not info[k]: info[k] = v - # Determine file extension and path + # Verify match against Spotify metadata + if not verify_match(track, track_data, match, log): + failed_tracks.append(query) + sp_id = track.get("sp_id") + if sp_id: + failed_urls.append(f"https://open.spotify.com/track/{sp_id}") + if i < total - 1: + time.sleep(0.5) + continue + + # Determine file extension and path (temp name until metadata rename) if want_mp3: ext = ".flac" elif api_quality in ("HIGH", "LOW"): @@ -150,6 +227,9 @@ def download_spotify_url(spotify_url, quality, output_dir, log=None, cancel_chec except Exception as e: log(f"[monochrome] Download failed for {query}: {e}") failed_tracks.append(query) + sp_id = track.get("sp_id") + if sp_id: + failed_urls.append(f"https://open.spotify.com/track/{sp_id}") if i < total - 1: time.sleep(0.5) continue @@ -160,17 +240,32 @@ def download_spotify_url(spotify_url, quality, output_dir, log=None, cancel_chec # Convert to MP3 if requested if want_mp3: - mp3_filename = sanitize_filename(f"{m_artist} - {m_title}.mp3") - mp3_path = os.path.join(dl_dir, mp3_filename) + mp3_path = os.path.join(dl_dir, sanitize_filename(f"{m_artist} - {m_title}.mp3")) if convert_to_mp3(file_path, mp3_path, log=log): embed_metadata(mp3_path, info, cover_data, log=log) + file_path = mp3_path + + # Rename from embedded metadata for consistent naming + final_path = rename_from_metadata(file_path) + log(f"[monochrome] Saved: {final_path.name}") success += 1 + last_final_path = final_path # Rate limit between tracks if i < total - 1: time.sleep(0.5) + # Wrap single tracks in a Title - Artist folder + if total == 1 and success == 1 and last_final_path: + folder_name = sanitize_filename(last_final_path.stem) + wrapper_dir = os.path.join(output_dir, folder_name) + os.makedirs(wrapper_dir, exist_ok=True) + shutil.move(str(last_final_path), os.path.join(wrapper_dir, last_final_path.name)) + subfolder_name = folder_name + fail_info["subfolder"] = subfolder_name + log(f"[monochrome] Saved to folder: {folder_name}") + # Summary if failed_tracks: log(f"[monochrome] Failed tracks ({len(failed_tracks)}):") @@ -178,4 +273,5 @@ def download_spotify_url(spotify_url, quality, output_dir, log=None, cancel_chec log(f"[monochrome] - {ft}") log(f"[monochrome] Complete: {success}/{total} tracks downloaded") - return success, total + fail_info["failed_urls"] = failed_urls + return success, total, fail_info diff --git a/monochrome/download.py b/monochrome/download.py index f28f836..94d934d 100644 --- a/monochrome/download.py +++ b/monochrome/download.py @@ -26,6 +26,7 @@ import subprocess import sys from monochrome import fetch, fetch_json, discover_instances, SSL_CTX, QOBUZ_API +from utils import sanitize_filename def extract_stream_url_from_manifest(manifest_b64, log=None): @@ -360,9 +361,6 @@ def embed_metadata(file_path, info, cover_data=None, log=None): log(f"[!] Failed to embed metadata: {e}") -def sanitize_filename(name): - return re.sub(r'[<>:"/\\|?*]', '_', name) - def convert_to_mp3(input_path, output_path, bitrate="320k", log=None): """Convert audio file to MP3 using ffmpeg.""" diff --git a/monochrome/spotify_to_ids.py b/monochrome/spotify_to_ids.py index 1a50768..c6ff02d 100644 --- a/monochrome/spotify_to_ids.py +++ b/monochrome/spotify_to_ids.py @@ -102,7 +102,7 @@ def extract_tracks(embed_data, sp_type, sp_id): else: artist = entity.get("subtitle", "") if title: - return [{"title": title, "artist": artist}] + return [{"title": title, "artist": artist, "sp_id": sp_id, "duration": entity.get("duration")}] elif sp_type in ("album", "playlist"): track_list = entity.get("trackList", []) @@ -111,8 +111,15 @@ def extract_tracks(embed_data, sp_type, sp_id): for t in track_list: title = t.get("title", "") artist = t.get("subtitle", "") + # Prefer uri (contains real Spotify track ID) over uid (internal hex UID) + track_uid = None + uri = t.get("uri", "") + if uri.startswith("spotify:track:"): + track_uid = uri.split(":")[-1] + if not track_uid: + track_uid = t.get("uid") if title: - tracks.append({"title": title, "artist": artist}) + tracks.append({"title": title, "artist": artist, "sp_id": track_uid, "duration": t.get("duration")}) if tracks: return tracks except (KeyError, TypeError, IndexError): @@ -123,7 +130,7 @@ def extract_tracks(embed_data, sp_type, sp_id): oembed_title = fetch_spotify_oembed(sp_type, sp_id) if oembed_title: print(f'[*] Using oEmbed fallback: "{oembed_title}"', file=sys.stderr) - return [{"title": oembed_title, "artist": ""}] + return [{"title": oembed_title, "artist": "", "sp_id": sp_id, "duration": None}] return [] diff --git a/templates/index.html b/templates/index.html index ce0e148..11cfa40 100644 --- a/templates/index.html +++ b/templates/index.html @@ -145,7 +145,7 @@ .card { padding: 16px; } body { padding-bottom: 64px; } textarea, select, input { font-size: 1rem; } - #btn-download, #btn-monochrome { width: 100%; border-radius: var(--radius); } + #btn-download, #btn-monochrome, #btn-unified { width: 100%; border-radius: var(--radius); } .job-preview { max-width: calc(100vw - 120px); } #toast { left: 16px; right: 16px; bottom: 72px; max-width: none; } #bottom-nav { display: flex; } @@ -154,10 +154,9 @@
Quality for Votify fallback when Monochrome can't find a track
+ +