diff --git a/Dockerfile b/Dockerfile index 4614a67..8bc7849 100644 --- a/Dockerfile +++ b/Dockerfile @@ -25,6 +25,7 @@ RUN pip install --no-cache-dir -r /app/requirements.txt COPY app.py /app/app.py COPY templates /app/templates COPY static /app/static +COPY monochrome /app/monochrome WORKDIR /app diff --git a/app.py b/app.py index afd75d8..c3084f5 100644 --- a/app.py +++ b/app.py @@ -259,6 +259,70 @@ def start_download(): return jsonify({"job_id": job_id}) +def run_monochrome_download(job_id: str, url: str, quality: str): + with jobs_lock: + jobs[job_id]["status"] = "running" + + def log(msg): + with jobs_lock: + jobs[job_id]["output"] = jobs[job_id].get("output", [])[-500:] + [msg] + + def is_cancelled(): + with jobs_lock: + return jobs[job_id]["status"] == "cancelled" + + try: + from monochrome.api import download_spotify_url + success, total = download_spotify_url( + spotify_url=url, + quality=quality, + output_dir=str(DOWNLOADS_DIR), + log=log, + cancel_check=is_cancelled, + ) + with jobs_lock: + if jobs[job_id]["status"] != "cancelled": + jobs[job_id]["status"] = "completed" if success > 0 else "failed" + jobs[job_id]["return_code"] = 0 if success > 0 else 1 + except Exception as e: + with jobs_lock: + jobs[job_id]["status"] = "failed" + jobs[job_id]["output"] = jobs[job_id].get("output", []) + [f"[error] {e}"] + jobs[job_id]["return_code"] = 1 + + +@app.route("/api/monochrome/download", methods=["POST"]) +def start_monochrome_download(): + data = request.json + url = data.get("url", "").strip() + quality = data.get("quality", "HI_RES_LOSSLESS") + + if not url: + return jsonify({"error": "No URL provided"}), 400 + + valid_qualities = ["HI_RES_LOSSLESS", "LOSSLESS", "HIGH", "LOW", "MP3_320"] + if quality not in valid_qualities: + return jsonify({"error": f"Invalid quality. Choose from: {valid_qualities}"}), 400 + + job_id = str(uuid.uuid4())[:8] + with jobs_lock: + jobs[job_id] = { + "id": job_id, + "urls": [url], + "options": {"quality": quality, "source": "monochrome"}, + "status": "queued", + "output": [], + "created_at": time.time(), + } + + thread = threading.Thread( + target=run_monochrome_download, args=(job_id, url, quality), daemon=True + ) + thread.start() + + return jsonify({"job_id": job_id}) + + def job_to_dict(job): return {k: v for k, v in job.items() if k != "process"} diff --git a/monochrome/__init__.py b/monochrome/__init__.py new file mode 100644 index 0000000..243ce68 --- /dev/null +++ b/monochrome/__init__.py @@ -0,0 +1,74 @@ +""" +Monochrome - shared utilities for Tidal/Qobuz music downloading via Monochrome API instances. +""" + +import json +import ssl +import sys +import urllib.request +import urllib.error + +# Hardcoded fallback API instances +FALLBACK_INSTANCES = [ + "https://monochrome.tf", + "https://triton.squid.wtf", + "https://qqdl.site", + "https://monochrome.samidy.com", + "https://api.monochrome.tf", +] + +QOBUZ_API = "https://qobuz.squid.wtf/api" +UPTIME_URL = "https://tidal-uptime.jiffy-puffs-1j.workers.dev/" + +# SSL context that doesn't verify certs (some instances have bad certs) +SSL_CTX = ssl.create_default_context() +SSL_CTX.check_hostname = False +SSL_CTX.verify_mode = ssl.CERT_NONE + + +def fetch(url, timeout=15, use_ssl_ctx=True): + """Fetch a URL, returning the response object.""" + req = urllib.request.Request(url, headers={"User-Agent": "Mozilla/5.0"}) + ctx = SSL_CTX if use_ssl_ctx else None + return urllib.request.urlopen(req, timeout=timeout, context=ctx) + + +def fetch_json(url, timeout=15, use_ssl_ctx=True): + """Fetch a URL and parse JSON response.""" + with fetch(url, timeout, use_ssl_ctx) as resp: + return json.loads(resp.read().decode()) + + +def discover_instances(log=None): + """Get live API instances from uptime monitor, fall back to hardcoded list.""" + if log is None: + log = print + try: + data = fetch_json(UPTIME_URL, timeout=10) + if isinstance(data, dict): + urls = [] + for key, val in data.items(): + if isinstance(val, dict) and val.get("url"): + urls.append(val["url"].rstrip("/")) + elif isinstance(val, str) and val.startswith("http"): + urls.append(val.rstrip("/")) + if urls: + log(f"[*] Discovered {len(urls)} instances from uptime monitor") + return urls + elif isinstance(data, list): + urls = [] + for item in data: + if isinstance(item, str) and item.startswith("http"): + urls.append(item.rstrip("/")) + elif isinstance(item, dict): + u = item.get("url") or item.get("uri") or "" + if u.startswith("http"): + urls.append(u.rstrip("/")) + if urls: + log(f"[*] Discovered {len(urls)} instances from uptime monitor") + return urls + except Exception as e: + log(f"[!] Uptime discovery failed: {e}") + + log(f"[*] Using {len(FALLBACK_INSTANCES)} fallback instances") + return list(FALLBACK_INSTANCES) diff --git a/monochrome/api.py b/monochrome/api.py new file mode 100644 index 0000000..ebe808f --- /dev/null +++ b/monochrome/api.py @@ -0,0 +1,181 @@ +""" +Monochrome API integration for Votify Web. + +Orchestrates the Spotify URL → Tidal ID → download pipeline +for use from app.py background threads. +""" + +import os +import time + +from monochrome import discover_instances +from monochrome.spotify_to_ids import ( + parse_spotify_url, + fetch_spotify_embed, + extract_collection_name, + extract_tracks, + search_monochrome, + find_best_match, +) +from monochrome.download import ( + get_stream_url_tidal, + get_stream_url_qobuz, + download_file, + fetch_cover_art, + embed_metadata, + sanitize_filename, + convert_to_mp3, +) + + +def download_spotify_url(spotify_url, quality, output_dir, log=None, cancel_check=None): + """Download tracks from a Spotify URL via Monochrome. + + Args: + spotify_url: Spotify track/album/playlist URL + quality: One of HI_RES_LOSSLESS, LOSSLESS, HIGH, LOW, MP3_320 + output_dir: Directory to save downloaded files + log: Callback (str) -> None for progress messages + cancel_check: Callback () -> bool, returns True if cancelled + + Returns: + (success_count, total_tracks) + """ + if log is None: + log = print + if cancel_check is None: + cancel_check = lambda: False + + want_mp3 = quality == "MP3_320" + api_quality = "LOSSLESS" if want_mp3 else quality + + # Step 1: Discover instances + log("[monochrome] Discovering API instances...") + instances = discover_instances(log=log) + + # Step 2: Parse Spotify URL + sp_type, sp_id = parse_spotify_url(spotify_url) + if not sp_type: + log(f"[monochrome] Invalid Spotify URL: {spotify_url}") + return 0, 0 + + # Step 3: Fetch track list from Spotify + log(f"[monochrome] Fetching Spotify {sp_type}: {sp_id}") + embed_data = fetch_spotify_embed(sp_type, sp_id) + tracks = extract_tracks(embed_data, sp_type, sp_id) + + if not tracks: + log(f"[monochrome] Could not extract tracks from {spotify_url}") + return 0, 0 + + total = len(tracks) + log(f"[monochrome] Found {total} track(s) on Spotify") + + # Create subfolder for albums/playlists + dl_dir = output_dir + if total > 1: + collection_name = extract_collection_name(embed_data, sp_type) + if collection_name: + folder_name = sanitize_filename(collection_name) + else: + folder_name = sanitize_filename(f"{sp_type}_{sp_id}") + dl_dir = os.path.join(output_dir, folder_name) + os.makedirs(dl_dir, exist_ok=True) + log(f"[monochrome] Saving to folder: {folder_name}") + + success = 0 + failed_tracks = [] + + for i, track in enumerate(tracks): + if cancel_check(): + log("[monochrome] Cancelled") + break + + query = f"{track['artist']} {track['title']}".strip() + log(f"[monochrome] Track {i + 1}/{total}: {query}") + + # Search and match + results = search_monochrome(instances, query, log=log) + match, score = find_best_match(results, track["title"], track["artist"]) + + if not match: + log(f"[monochrome] No match found for: {query}") + failed_tracks.append(query) + if i < total - 1: + time.sleep(0.5) + continue + + track_id = match.get("id") + m_title = match.get("title", "?") + m_artist_obj = match.get("artist", {}) + m_artist = m_artist_obj.get("name", "?") if isinstance(m_artist_obj, dict) else str(m_artist_obj) + log(f"[monochrome] Matched: {m_artist} - {m_title} (score: {score:.2f})") + + # Get stream URL + stream_url, track_data = get_stream_url_tidal(instances, track_id, api_quality, log=log) + + if not stream_url: + log("[monochrome] Tidal failed, trying Qobuz...") + stream_url = get_stream_url_qobuz(track_id, api_quality, log=log) + + if not stream_url: + log(f"[monochrome] Failed to get stream for: {query}") + failed_tracks.append(query) + if i < total - 1: + time.sleep(0.5) + continue + + # Build metadata from match info + info = match + if track_data and isinstance(track_data, dict): + # Merge: track_data may have more detail + for k, v in track_data.items(): + if k not in info or not info[k]: + info[k] = v + + # Determine file extension and path + if want_mp3: + ext = ".flac" + elif api_quality in ("HIGH", "LOW"): + ext = ".m4a" + else: + ext = ".flac" + + filename = sanitize_filename(f"{m_artist} - {m_title}{ext}") + file_path = os.path.join(dl_dir, filename) + + # Download + try: + download_file(stream_url, file_path, log=log) + except Exception as e: + log(f"[monochrome] Download failed for {query}: {e}") + failed_tracks.append(query) + if i < total - 1: + time.sleep(0.5) + continue + + # Cover art and metadata + cover_data = fetch_cover_art(info.get("album"), log=log) + embed_metadata(file_path, info, cover_data, log=log) + + # Convert to MP3 if requested + if want_mp3: + mp3_filename = sanitize_filename(f"{m_artist} - {m_title}.mp3") + mp3_path = os.path.join(dl_dir, mp3_filename) + if convert_to_mp3(file_path, mp3_path, log=log): + embed_metadata(mp3_path, info, cover_data, log=log) + + success += 1 + + # Rate limit between tracks + if i < total - 1: + time.sleep(0.5) + + # Summary + if failed_tracks: + log(f"[monochrome] Failed tracks ({len(failed_tracks)}):") + for ft in failed_tracks: + log(f"[monochrome] - {ft}") + + log(f"[monochrome] Complete: {success}/{total} tracks downloaded") + return success, total diff --git a/monochrome/download.py b/monochrome/download.py new file mode 100644 index 0000000..f28f836 --- /dev/null +++ b/monochrome/download.py @@ -0,0 +1,482 @@ +""" +Monochrome song downloader. + +Usage: + python download.py [--quality QUALITY] [--output FILENAME] + +Track IDs can be found in the URL when viewing a track on the site, e.g.: + https://monochrome.app/track/12345678 -> track_id = 12345678 + +Quality options: + HI_RES_LOSSLESS (default, highest available) + LOSSLESS (16-bit/44.1kHz FLAC) + HIGH (AAC 320kbps) + LOW (AAC 96kbps) + MP3_320 (downloads as LOSSLESS, converts to MP3 via ffmpeg) +""" + +import argparse +import base64 +import json +import os +import random +import re +import shutil +import subprocess +import sys + +from monochrome import fetch, fetch_json, discover_instances, SSL_CTX, QOBUZ_API + + +def extract_stream_url_from_manifest(manifest_b64, log=None): + """Decode base64 manifest and extract stream URL (mirrors frontend logic).""" + if log is None: + log = print + try: + decoded = base64.b64decode(manifest_b64).decode("utf-8", errors="replace") + except Exception: + return None + + # Type 1: DASH manifest XML — extract BaseURL + if "(https?://[^<]+)", decoded) + if match: + log("[*] Extracted URL from DASH manifest") + return match.group(1) + log("[!] DASH manifest found but no BaseURL — DASH streaming not supported in CLI") + return None + + # Type 2: JSON with urls array + try: + parsed = json.loads(decoded) + if isinstance(parsed, dict): + urls = parsed.get("urls") + if isinstance(urls, list) and urls: + log("[*] Extracted URL from JSON manifest") + return urls[0] + url = parsed.get("url") + if url: + return url + except (json.JSONDecodeError, ValueError): + pass + + # Type 3: Raw URL in the decoded string + match = re.search(r"https?://[\w\-.~:?#\[@!$&'()*+,;=%/]+", decoded) + if match: + log("[*] Extracted URL from raw manifest") + return match.group(0) + + return None + + +def get_stream_url_tidal(instances, track_id, quality, log=None): + """Get stream URL from Tidal-based instances using /track/ endpoint.""" + if log is None: + log = print + shuffled = list(instances) + random.shuffle(shuffled) + + for base in shuffled: + url = f"{base}/track/?id={track_id}&quality={quality}" + log(f"[*] Trying {base} ...") + try: + with fetch(url) as resp: + status = resp.status + if status == 429: + log(" Rate limited, skipping") + continue + if status >= 400: + log(f" HTTP {status}") + continue + + body = resp.read().decode() + data = json.loads(body) + + # Unwrap {"version": "...", "data": {...}} envelope + if isinstance(data, dict) and "data" in data and "version" in data: + data = data["data"] + + # Response can be an array [trackObj, streamInfo] or an object + if isinstance(data, list) and len(data) >= 2: + track_obj = data[0] + stream_info = data[1] + elif isinstance(data, dict): + track_obj = data + stream_info = data + else: + log(f" Unexpected response format: {type(data)}") + continue + + # Check for OriginalTrackUrl first (direct URL, no decoding needed) + original_url = None + if isinstance(stream_info, dict): + original_url = stream_info.get("OriginalTrackUrl") or stream_info.get("originalTrackUrl") + if isinstance(data, dict): + original_url = original_url or data.get("originalTrackUrl") or data.get("OriginalTrackUrl") + + if original_url: + log(f"[+] Got direct stream URL from {base}") + return original_url, track_obj + + # Fall back to manifest decoding + manifest = None + if isinstance(stream_info, dict): + manifest = stream_info.get("manifest") + if isinstance(data, dict): + manifest = manifest or data.get("manifest") + + if manifest: + stream_url = extract_stream_url_from_manifest(manifest, log=log) + if stream_url: + log(f"[+] Got stream URL from manifest ({base})") + return stream_url, track_obj + else: + log(" Could not extract URL from manifest") + continue + + # Maybe the response itself contains a URL + if isinstance(data, dict): + for key in ("url", "streamUrl", "stream_url"): + if data.get(key): + log(f"[+] Got stream URL from {base}") + return data[key], track_obj + + log(" No stream URL found in response") + if isinstance(data, dict): + log(f" Response keys: {list(data.keys())}") + elif isinstance(data, list): + for i, item in enumerate(data): + if isinstance(item, dict): + log(f" [{i}] keys: {list(item.keys())}") + + except json.JSONDecodeError as e: + log(f" Invalid JSON: {e}") + except Exception as e: + log(f" Failed: {e}") + continue + + return None, None + + +def get_stream_url_qobuz(track_id, quality, log=None): + """Try Qobuz API as an alternative source.""" + if log is None: + log = print + qobuz_quality_map = { + "MP3_320": "27", + "LOSSLESS": "7", + "HI_RES_LOSSLESS": "5", + } + q = qobuz_quality_map.get(quality, "5") + + url = f"{QOBUZ_API}/download-music?track_id={track_id}&quality={q}" + log("[*] Trying Qobuz API ...") + try: + data = fetch_json(url) + if isinstance(data, dict): + if data.get("success") and isinstance(data.get("data"), dict): + stream_url = data["data"].get("url") + if stream_url: + log("[+] Got stream URL from Qobuz") + return stream_url + elif data.get("url"): + log("[+] Got stream URL from Qobuz") + return data["url"] + except Exception as e: + log(f" Qobuz failed: {e}") + + return None + + +def get_track_info(instances, track_id, log=None): + """Fetch track metadata for filename.""" + if log is None: + log = print + shuffled = list(instances) + random.shuffle(shuffled) + for base in shuffled: + for endpoint in [f"/info/?id={track_id}", f"/track/?id={track_id}&quality=LOSSLESS"]: + try: + data = fetch_json(f"{base}{endpoint}") + # Unwrap version/data envelope + if isinstance(data, dict) and "data" in data and "version" in data: + data = data["data"] + if isinstance(data, list) and data: + data = data[0] + if isinstance(data, dict) and data.get("title"): + return data + except Exception: + continue + return None + + +def download_file(url, output_path, log=None): + """Download a file with progress display.""" + if log is None: + log = print + with fetch(url, timeout=120) as resp: + total = resp.headers.get("Content-Length") + total = int(total) if total else None + + with open(output_path, "wb") as f: + downloaded = 0 + last_pct = -1 + while True: + chunk = resp.read(8192) + if not chunk: + break + f.write(chunk) + downloaded += len(chunk) + if total: + pct = int(downloaded / total * 100) + # Only log every 10% to avoid flooding + if pct // 10 > last_pct // 10: + mb = downloaded / (1024 * 1024) + total_mb = total / (1024 * 1024) + log(f"[*] Downloading: {mb:.1f}/{total_mb:.1f} MB ({pct}%)") + last_pct = pct + + log(f"[+] Saved to {output_path}") + + +def fetch_cover_art(album_info, log=None): + """Download album cover art from Tidal CDN. Returns JPEG bytes or None.""" + if log is None: + log = print + if not album_info or not isinstance(album_info, dict): + return None + cover_id = album_info.get("cover") + if not cover_id: + return None + # Tidal CDN uses slashes instead of dashes in the cover UUID + formatted = cover_id.replace("-", "/") + url = f"https://resources.tidal.com/images/{formatted}/1280x1280.jpg" + log("[*] Fetching album art ...") + try: + with fetch(url) as resp: + return resp.read() + except Exception as e: + log(f"[!] Could not fetch cover art: {e}") + return None + + +def embed_metadata(file_path, info, cover_data=None, log=None): + """Embed metadata and cover art into audio file using mutagen.""" + if log is None: + log = print + if not info: + return + + ext = os.path.splitext(file_path)[1].lower() + title = info.get("title", "") + artist_obj = info.get("artist", {}) + artist_name = artist_obj.get("name", "") if isinstance(artist_obj, dict) else str(artist_obj) + # Join all artists for the artist tag + artists = info.get("artists", []) + all_artists = ", ".join(a.get("name", "") for a in artists if isinstance(a, dict)) if artists else artist_name + album_obj = info.get("album", {}) + album_title = album_obj.get("title", "") if isinstance(album_obj, dict) else "" + track_num = info.get("trackNumber") + disc_num = info.get("volumeNumber") + copyright_text = info.get("copyright", "") + isrc = info.get("isrc", "") + release_date = info.get("streamStartDate", "") + year = release_date[:4] if release_date and len(release_date) >= 4 else "" + + log("[*] Embedding metadata ...") + + try: + if ext == ".flac": + from mutagen.flac import FLAC, Picture + audio = FLAC(file_path) + audio["title"] = title + audio["artist"] = all_artists + audio["album"] = album_title + if track_num: + audio["tracknumber"] = str(track_num) + if disc_num: + audio["discnumber"] = str(disc_num) + if copyright_text: + audio["copyright"] = copyright_text + if isrc: + audio["isrc"] = isrc + if year: + audio["date"] = year + if cover_data: + pic = Picture() + pic.type = 3 # front cover + pic.mime = "image/jpeg" + pic.data = cover_data + audio.clear_pictures() + audio.add_picture(pic) + audio.save() + log("[+] Metadata embedded in FLAC") + + elif ext == ".mp3": + from mutagen.id3 import ID3, TIT2, TPE1, TALB, TRCK, TPOS, TCOP, TDRC, TSRC, APIC + try: + audio = ID3(file_path) + except Exception: + from mutagen.id3 import ID3NoHeaderError + audio = ID3() + audio.add(TIT2(encoding=3, text=title)) + audio.add(TPE1(encoding=3, text=all_artists)) + audio.add(TALB(encoding=3, text=album_title)) + if track_num: + audio.add(TRCK(encoding=3, text=str(track_num))) + if disc_num: + audio.add(TPOS(encoding=3, text=str(disc_num))) + if copyright_text: + audio.add(TCOP(encoding=3, text=copyright_text)) + if year: + audio.add(TDRC(encoding=3, text=year)) + if isrc: + audio.add(TSRC(encoding=3, text=isrc)) + if cover_data: + audio.add(APIC(encoding=3, mime="image/jpeg", type=3, desc="Cover", data=cover_data)) + audio.save(file_path) + log("[+] Metadata embedded in MP3") + + elif ext == ".m4a": + from mutagen.mp4 import MP4, MP4Cover + audio = MP4(file_path) + audio["\xa9nam"] = [title] + audio["\xa9ART"] = [all_artists] + audio["\xa9alb"] = [album_title] + if track_num: + audio["trkn"] = [(track_num, 0)] + if disc_num: + audio["disk"] = [(disc_num, 0)] + if copyright_text: + audio["cprt"] = [copyright_text] + if year: + audio["\xa9day"] = [year] + if cover_data: + audio["covr"] = [MP4Cover(cover_data, imageformat=MP4Cover.FORMAT_JPEG)] + audio.save() + log("[+] Metadata embedded in M4A") + + except Exception as e: + log(f"[!] Failed to embed metadata: {e}") + + +def sanitize_filename(name): + return re.sub(r'[<>:"/\\|?*]', '_', name) + + +def convert_to_mp3(input_path, output_path, bitrate="320k", log=None): + """Convert audio file to MP3 using ffmpeg.""" + if log is None: + log = print + ffmpeg = shutil.which("ffmpeg") + if not ffmpeg: + log("[!] ffmpeg not found in PATH. Install ffmpeg to enable MP3 conversion.") + log(f"[*] Keeping FLAC file: {input_path}") + return False + + log(f"[*] Converting to MP3 ({bitrate}) ...") + try: + subprocess.run( + [ffmpeg, "-i", input_path, "-ab", bitrate, "-map_metadata", "0", "-y", output_path], + check=True, capture_output=True, + ) + os.remove(input_path) + log(f"[+] Converted to {output_path}") + return True + except subprocess.CalledProcessError as e: + log(f"[!] ffmpeg conversion failed: {e.stderr.decode()[:200]}") + log(f"[*] Keeping FLAC file: {input_path}") + return False + + +def main(): + parser = argparse.ArgumentParser(description="Download a song from Monochrome") + parser.add_argument("track_id", help="Track ID (from the URL, e.g. 12345678)") + parser.add_argument("--quality", default="HI_RES_LOSSLESS", + choices=["HI_RES_LOSSLESS", "LOSSLESS", "HIGH", "LOW", "MP3_320"], + help="Audio quality (default: HI_RES_LOSSLESS)") + parser.add_argument("--output", "-o", help="Output filename (auto-detected if omitted)") + args = parser.parse_args() + + want_mp3 = args.quality == "MP3_320" + # MP3_320 isn't a real API quality — download as LOSSLESS then convert + api_quality = "LOSSLESS" if want_mp3 else args.quality + + instances = discover_instances() + + # Try to get track metadata for a nice filename + print(f"[*] Fetching metadata for track {args.track_id} ...") + info = get_track_info(instances, args.track_id) + if info: + title = info.get("title", args.track_id) + artist = info.get("artist", {}) + if isinstance(artist, dict): + artist = artist.get("name", "Unknown") + print(f"[*] Track: {artist} - {title}") + else: + print("[*] Could not fetch metadata (will use track ID for filename)") + + # Get the stream URL — try Tidal instances first, then Qobuz + print(f"[*] Requesting stream (quality={api_quality}) ...") + stream_url, track_data = get_stream_url_tidal(instances, args.track_id, api_quality) + + if not stream_url: + print("[*] Tidal instances failed, trying Qobuz ...") + stream_url = get_stream_url_qobuz(args.track_id, api_quality) + + if not stream_url: + print("[!] Could not get a stream URL from any source.") + sys.exit(1) + + # Merge metadata from track_data if we didn't get it earlier + if not info and track_data and isinstance(track_data, dict): + info = track_data + + # Build filename helper + def make_filename(ext): + if args.output: + return args.output + if info and info.get("title"): + t = info.get("title", args.track_id) + a = info.get("artist", {}) + if isinstance(a, dict): + a = a.get("name", "Unknown") + elif not isinstance(a, str): + a = "Unknown" + return sanitize_filename(f"{a} - {t}{ext}") + return f"{args.track_id}{ext}" + + # Determine extensions based on quality + if want_mp3: + flac_path = make_filename(".flac") + mp3_path = make_filename(".mp3") + elif api_quality in ("HIGH", "LOW"): + flac_path = make_filename(".m4a") + mp3_path = None + else: + flac_path = make_filename(".flac") + mp3_path = None + + # Fetch cover art + cover_data = None + if info: + cover_data = fetch_cover_art(info.get("album")) + + # Download + print(f"[*] Stream URL: {stream_url[:100]}...") + download_file(stream_url, flac_path) + + # Embed metadata into the downloaded file + embed_metadata(flac_path, info, cover_data) + + # Convert to MP3 if requested + if want_mp3: + convert_to_mp3(flac_path, mp3_path) + # Re-embed metadata into MP3 (ffmpeg may not carry everything over) + embed_metadata(mp3_path, info, cover_data) + + +if __name__ == "__main__": + # Allow running as standalone script + sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + main() diff --git a/monochrome/spotify_to_ids.py b/monochrome/spotify_to_ids.py new file mode 100644 index 0000000..1a50768 --- /dev/null +++ b/monochrome/spotify_to_ids.py @@ -0,0 +1,269 @@ +""" +Convert Spotify URLs to Monochrome/Tidal track IDs. + +Usage: + python spotify_to_ids.py [...] [-v] [--threshold N] + +Supports track, album, and playlist URLs. Outputs one track ID per line (stdout). + +Examples: + python spotify_to_ids.py https://open.spotify.com/track/4PTG3Z6ehGkBFwjybzWkR8 + python spotify_to_ids.py -v https://open.spotify.com/album/4aawyAB9vmqN3uQ7FjRGTy + python spotify_to_ids.py https://open.spotify.com/playlist/xxx | xargs -I{} python download.py {} +""" + +import argparse +import json +import os +import random +import re +import sys +import time +import urllib.error +import urllib.parse +import urllib.request + +from monochrome import fetch, fetch_json, discover_instances + + +# --- Spotify URL parsing --- + +def parse_spotify_url(url): + """Parse a Spotify URL into (type, id). Returns (None, None) on failure.""" + match = re.match( + r'https?://open\.spotify\.com/(?:intl-\w+/)?(track|album|playlist)/([a-zA-Z0-9]+)', + url.strip() + ) + if not match: + return None, None + return match.group(1), match.group(2) + + +# --- Spotify metadata extraction --- + +def fetch_spotify_embed(sp_type, sp_id): + """Fetch Spotify embed page and extract __NEXT_DATA__ JSON.""" + url = f"https://open.spotify.com/embed/{sp_type}/{sp_id}" + try: + with fetch(url, timeout=15, use_ssl_ctx=False) as resp: + html = resp.read().decode() + except Exception as e: + print(f"[!] Failed to fetch Spotify embed: {e}", file=sys.stderr) + return None + + match = re.search( + r'\s*({.+?})\s*', + html, re.DOTALL + ) + if match: + try: + return json.loads(match.group(1)) + except json.JSONDecodeError: + pass + + print("[!] __NEXT_DATA__ not found in embed page", file=sys.stderr) + return None + + +def fetch_spotify_oembed(sp_type, sp_id): + """Fallback: use oEmbed API to get at least a title string.""" + spotify_url = f"https://open.spotify.com/{sp_type}/{sp_id}" + oembed_url = f"https://open.spotify.com/oembed?url={urllib.parse.quote(spotify_url, safe='')}" + try: + data = fetch_json(oembed_url, timeout=15, use_ssl_ctx=False) + return data.get("title", "") + except Exception: + return None + + +def extract_collection_name(embed_data, sp_type): + """Extract album/playlist name from __NEXT_DATA__ JSON. Returns None for single tracks.""" + if not embed_data or sp_type == "track": + return None + try: + entity = embed_data["props"]["pageProps"]["state"]["data"]["entity"] + return entity.get("name") or entity.get("title") + except (KeyError, TypeError, IndexError): + return None + + +def extract_tracks(embed_data, sp_type, sp_id): + """Extract list of {title, artist} dicts from __NEXT_DATA__ JSON. + Falls back to oEmbed if embed data is missing or malformed.""" + if embed_data: + try: + entity = embed_data["props"]["pageProps"]["state"]["data"]["entity"] + + if sp_type == "track": + title = entity.get("name") or entity.get("title", "") + artists = entity.get("artists") + if artists and isinstance(artists, list): + artist = artists[0].get("name", "") + else: + artist = entity.get("subtitle", "") + if title: + return [{"title": title, "artist": artist}] + + elif sp_type in ("album", "playlist"): + track_list = entity.get("trackList", []) + if track_list: + tracks = [] + for t in track_list: + title = t.get("title", "") + artist = t.get("subtitle", "") + if title: + tracks.append({"title": title, "artist": artist}) + if tracks: + return tracks + except (KeyError, TypeError, IndexError): + pass + + # Fallback: oEmbed (single tracks only, limited data) + if sp_type == "track": + oembed_title = fetch_spotify_oembed(sp_type, sp_id) + if oembed_title: + print(f'[*] Using oEmbed fallback: "{oembed_title}"', file=sys.stderr) + return [{"title": oembed_title, "artist": ""}] + + return [] + + +# --- Fuzzy matching --- + +def normalize(text): + """Normalize text for comparison: lowercase, strip feat/remaster/punctuation.""" + text = text.lower() + text = re.sub(r'\(feat\.?[^)]*\)', '', text) + text = re.sub(r'\(ft\.?[^)]*\)', '', text) + text = re.sub(r'\(remaster(ed)?\)', '', text, flags=re.IGNORECASE) + text = re.sub(r'[^\w\s]', ' ', text) + return ' '.join(text.split()) + + +def similarity(a, b): + """Token overlap ratio (Jaccard index).""" + tokens_a = set(normalize(a).split()) + tokens_b = set(normalize(b).split()) + if not tokens_a or not tokens_b: + return 0.0 + return len(tokens_a & tokens_b) / len(tokens_a | tokens_b) + + +def find_best_match(results, target_title, target_artist, threshold=0.4): + """Find the best matching track from Monochrome search results.""" + best = None + best_score = 0 + + for r in results: + r_title = r.get("title", "") + r_artist_obj = r.get("artist", {}) + if isinstance(r_artist_obj, dict): + r_artist = r_artist_obj.get("name", "") + else: + r_artist = str(r_artist_obj) + + title_sim = similarity(target_title, r_title) + artist_sim = similarity(target_artist, r_artist) if target_artist else 0.5 + score = 0.6 * title_sim + 0.4 * artist_sim + + if score > best_score: + best_score = score + best = r + + if best and best_score >= threshold: + return best, best_score + return None, 0 + + +# --- Monochrome search --- + +def search_monochrome(instances, query, log=None): + """Search Monochrome instances for tracks matching a query string.""" + if log is None: + log = print + shuffled = list(instances) + random.shuffle(shuffled) + encoded = urllib.parse.quote(query) + + for base in shuffled: + url = f"{base}/search/?s={encoded}" + try: + data = fetch_json(url, timeout=15) + if isinstance(data, dict) and "data" in data and "version" in data: + data = data["data"] + if isinstance(data, dict) and "items" in data: + return data["items"] + if isinstance(data, list): + return data + if isinstance(data, dict) and "tracks" in data: + return data["tracks"] + except Exception: + continue + return [] + + +# --- Main --- + +def main(): + parser = argparse.ArgumentParser( + description="Convert Spotify URLs to Monochrome/Tidal track IDs" + ) + parser.add_argument("urls", nargs="+", help="Spotify track/album/playlist URLs") + parser.add_argument("-v", "--verbose", action="store_true", + help="Show matched title/artist alongside IDs") + parser.add_argument("--threshold", type=float, default=0.4, + help="Minimum match score 0-1 (default: 0.4)") + args = parser.parse_args() + + instances = discover_instances() + found = 0 + missed = 0 + + for url in args.urls: + sp_type, sp_id = parse_spotify_url(url) + if not sp_type: + print(f"[!] Invalid Spotify URL: {url}", file=sys.stderr) + continue + + print(f"[*] Fetching Spotify {sp_type}: {sp_id}", file=sys.stderr) + embed_data = fetch_spotify_embed(sp_type, sp_id) + tracks = extract_tracks(embed_data, sp_type, sp_id) + + if not tracks: + print(f"[!] Could not extract tracks from {url}", file=sys.stderr) + continue + + print(f"[*] Found {len(tracks)} track(s) on Spotify", file=sys.stderr) + + for i, track in enumerate(tracks): + query = f"{track['artist']} {track['title']}".strip() + print(f"[*] Searching: {query}", file=sys.stderr) + + results = search_monochrome(instances, query) + match, score = find_best_match(results, track["title"], track["artist"], args.threshold) + + if match: + tid = match.get("id") + found += 1 + if args.verbose: + m_title = match.get("title", "?") + m_artist_obj = match.get("artist", {}) + m_artist = m_artist_obj.get("name", "?") if isinstance(m_artist_obj, dict) else str(m_artist_obj) + print(f"{tid}\t{m_artist} - {m_title}\t(score: {score:.2f})") + else: + print(tid) + else: + missed += 1 + print(f"[!] No match: {track['artist']} - {track['title']}", file=sys.stderr) + + # Rate limit delay between searches (skip after last track) + if i < len(tracks) - 1: + time.sleep(0.5) + + print(f"\n[*] Done: {found} matched, {missed} missed", file=sys.stderr) + + +if __name__ == "__main__": + # Allow running as standalone script + sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + main() diff --git a/requirements.txt b/requirements.txt index 398de43..a6ac7e4 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,3 @@ flask==3.1.0 gunicorn==23.0.0 +mutagen diff --git a/templates/index.html b/templates/index.html index 51a684c..ce0e148 100644 --- a/templates/index.html +++ b/templates/index.html @@ -145,7 +145,7 @@ .card { padding: 16px; } body { padding-bottom: 64px; } textarea, select, input { font-size: 1rem; } - #btn-download { width: 100%; border-radius: var(--radius); } + #btn-download, #btn-monochrome { width: 100%; border-radius: var(--radius); } .job-preview { max-width: calc(100vw - 120px); } #toast { left: 16px; right: 16px; bottom: 72px; max-width: none; } #bottom-nav { display: flex; } @@ -156,7 +156,8 @@

Votify Web

- + + @@ -167,7 +168,7 @@
-

New Download

+

Votify Download

@@ -235,6 +236,30 @@
+ +
+
+

Monochrome Download

+
+ + +
+
+
+ + +
+
+ +
+
+
@@ -290,7 +315,11 @@