""" Monochrome song downloader. Usage: python download.py [--quality QUALITY] [--output FILENAME] Track IDs can be found in the URL when viewing a track on the site, e.g.: https://monochrome.app/track/12345678 -> track_id = 12345678 Quality options: HI_RES_LOSSLESS (default, highest available) LOSSLESS (16-bit/44.1kHz FLAC) HIGH (AAC 320kbps) LOW (AAC 96kbps) MP3_320 (downloads as LOSSLESS, converts to MP3 via ffmpeg) """ import argparse import base64 import json import os import random import re import shutil import subprocess import sys from monochrome import fetch, fetch_json, discover_instances, SSL_CTX, QOBUZ_API from utils import sanitize_filename def extract_stream_url_from_manifest(manifest_b64, log=None): """Decode base64 manifest and extract stream URL (mirrors frontend logic).""" if log is None: log = print try: decoded = base64.b64decode(manifest_b64).decode("utf-8", errors="replace") except Exception: return None # Type 1: DASH manifest XML — extract BaseURL if "(https?://[^<]+)", decoded) if match: log("[*] Extracted URL from DASH manifest") return match.group(1) log("[!] DASH manifest found but no BaseURL — DASH streaming not supported in CLI") return None # Type 2: JSON with urls array try: parsed = json.loads(decoded) if isinstance(parsed, dict): urls = parsed.get("urls") if isinstance(urls, list) and urls: log("[*] Extracted URL from JSON manifest") return urls[0] url = parsed.get("url") if url: return url except (json.JSONDecodeError, ValueError): pass # Type 3: Raw URL in the decoded string match = re.search(r"https?://[\w\-.~:?#\[@!$&'()*+,;=%/]+", decoded) if match: log("[*] Extracted URL from raw manifest") return match.group(0) return None def get_stream_url_tidal(instances, track_id, quality, log=None): """Get stream URL from Tidal-based instances using /track/ endpoint.""" if log is None: log = print shuffled = list(instances) random.shuffle(shuffled) for base in shuffled: url = f"{base}/track/?id={track_id}&quality={quality}" log(f"[*] Trying {base} ...") try: with fetch(url) as resp: status = resp.status if status == 429: log(" Rate limited, skipping") continue if status >= 400: log(f" HTTP {status}") continue body = resp.read().decode() data = json.loads(body) # Unwrap {"version": "...", "data": {...}} envelope if isinstance(data, dict) and "data" in data and "version" in data: data = data["data"] # Response can be an array [trackObj, streamInfo] or an object if isinstance(data, list) and len(data) >= 2: track_obj = data[0] stream_info = data[1] elif isinstance(data, dict): track_obj = data stream_info = data else: log(f" Unexpected response format: {type(data)}") continue # Check for OriginalTrackUrl first (direct URL, no decoding needed) original_url = None if isinstance(stream_info, dict): original_url = stream_info.get("OriginalTrackUrl") or stream_info.get("originalTrackUrl") if isinstance(data, dict): original_url = original_url or data.get("originalTrackUrl") or data.get("OriginalTrackUrl") if original_url: log(f"[+] Got direct stream URL from {base}") return original_url, track_obj # Fall back to manifest decoding manifest = None if isinstance(stream_info, dict): manifest = stream_info.get("manifest") if isinstance(data, dict): manifest = manifest or data.get("manifest") if manifest: stream_url = extract_stream_url_from_manifest(manifest, log=log) if stream_url: log(f"[+] Got stream URL from manifest ({base})") return stream_url, track_obj else: log(" Could not extract URL from manifest") continue # Maybe the response itself contains a URL if isinstance(data, dict): for key in ("url", "streamUrl", "stream_url"): if data.get(key): log(f"[+] Got stream URL from {base}") return data[key], track_obj log(" No stream URL found in response") if isinstance(data, dict): log(f" Response keys: {list(data.keys())}") elif isinstance(data, list): for i, item in enumerate(data): if isinstance(item, dict): log(f" [{i}] keys: {list(item.keys())}") except json.JSONDecodeError as e: log(f" Invalid JSON: {e}") except Exception as e: log(f" Failed: {e}") continue return None, None def get_stream_url_qobuz(track_id, quality, log=None): """Try Qobuz API as an alternative source.""" if log is None: log = print qobuz_quality_map = { "MP3_320": "27", "LOSSLESS": "7", "HI_RES_LOSSLESS": "5", } q = qobuz_quality_map.get(quality, "5") url = f"{QOBUZ_API}/download-music?track_id={track_id}&quality={q}" log("[*] Trying Qobuz API ...") try: data = fetch_json(url) if isinstance(data, dict): if data.get("success") and isinstance(data.get("data"), dict): stream_url = data["data"].get("url") if stream_url: log("[+] Got stream URL from Qobuz") return stream_url elif data.get("url"): log("[+] Got stream URL from Qobuz") return data["url"] except Exception as e: log(f" Qobuz failed: {e}") return None def get_track_info(instances, track_id, log=None): """Fetch track metadata for filename.""" if log is None: log = print shuffled = list(instances) random.shuffle(shuffled) for base in shuffled: for endpoint in [f"/info/?id={track_id}", f"/track/?id={track_id}&quality=LOSSLESS"]: try: data = fetch_json(f"{base}{endpoint}") # Unwrap version/data envelope if isinstance(data, dict) and "data" in data and "version" in data: data = data["data"] if isinstance(data, list) and data: data = data[0] if isinstance(data, dict) and data.get("title"): return data except Exception: continue return None def download_file(url, output_path, log=None): """Download a file with progress display.""" if log is None: log = print with fetch(url, timeout=120) as resp: total = resp.headers.get("Content-Length") total = int(total) if total else None with open(output_path, "wb") as f: downloaded = 0 last_pct = -1 while True: chunk = resp.read(8192) if not chunk: break f.write(chunk) downloaded += len(chunk) if total: pct = int(downloaded / total * 100) # Only log every 10% to avoid flooding if pct // 10 > last_pct // 10: mb = downloaded / (1024 * 1024) total_mb = total / (1024 * 1024) log(f"[*] Downloading: {mb:.1f}/{total_mb:.1f} MB ({pct}%)") last_pct = pct log(f"[+] Saved to {output_path}") def fetch_cover_art(album_info, log=None): """Download album cover art from Tidal CDN. Returns JPEG bytes or None.""" if log is None: log = print if not album_info or not isinstance(album_info, dict): return None cover_id = album_info.get("cover") if not cover_id: return None # Tidal CDN uses slashes instead of dashes in the cover UUID formatted = cover_id.replace("-", "/") url = f"https://resources.tidal.com/images/{formatted}/1280x1280.jpg" log("[*] Fetching album art ...") try: with fetch(url) as resp: return resp.read() except Exception as e: log(f"[!] Could not fetch cover art: {e}") return None def embed_metadata(file_path, info, cover_data=None, log=None): """Embed metadata and cover art into audio file using mutagen.""" if log is None: log = print if not info: return ext = os.path.splitext(file_path)[1].lower() title = info.get("title", "") artist_obj = info.get("artist", {}) artist_name = artist_obj.get("name", "") if isinstance(artist_obj, dict) else str(artist_obj) # Join all artists for the artist tag artists = info.get("artists", []) all_artists = ", ".join(a.get("name", "") for a in artists if isinstance(a, dict)) if artists else artist_name album_obj = info.get("album", {}) album_title = album_obj.get("title", "") if isinstance(album_obj, dict) else "" track_num = info.get("trackNumber") disc_num = info.get("volumeNumber") copyright_text = info.get("copyright", "") isrc = info.get("isrc", "") release_date = info.get("streamStartDate", "") year = release_date[:4] if release_date and len(release_date) >= 4 else "" log("[*] Embedding metadata ...") try: if ext == ".flac": from mutagen.flac import FLAC, Picture audio = FLAC(file_path) audio["title"] = title audio["artist"] = all_artists audio["album"] = album_title if track_num: audio["tracknumber"] = str(track_num) if disc_num: audio["discnumber"] = str(disc_num) if copyright_text: audio["copyright"] = copyright_text if isrc: audio["isrc"] = isrc if year: audio["date"] = year if cover_data: pic = Picture() pic.type = 3 # front cover pic.mime = "image/jpeg" pic.data = cover_data audio.clear_pictures() audio.add_picture(pic) audio.save() log("[+] Metadata embedded in FLAC") elif ext == ".mp3": from mutagen.id3 import ID3, TIT2, TPE1, TALB, TRCK, TPOS, TCOP, TDRC, TSRC, APIC try: audio = ID3(file_path) except Exception: from mutagen.id3 import ID3NoHeaderError audio = ID3() audio.add(TIT2(encoding=3, text=title)) audio.add(TPE1(encoding=3, text=all_artists)) audio.add(TALB(encoding=3, text=album_title)) if track_num: audio.add(TRCK(encoding=3, text=str(track_num))) if disc_num: audio.add(TPOS(encoding=3, text=str(disc_num))) if copyright_text: audio.add(TCOP(encoding=3, text=copyright_text)) if year: audio.add(TDRC(encoding=3, text=year)) if isrc: audio.add(TSRC(encoding=3, text=isrc)) if cover_data: audio.add(APIC(encoding=3, mime="image/jpeg", type=3, desc="Cover", data=cover_data)) audio.save(file_path) log("[+] Metadata embedded in MP3") elif ext == ".m4a": from mutagen.mp4 import MP4, MP4Cover audio = MP4(file_path) audio["\xa9nam"] = [title] audio["\xa9ART"] = [all_artists] audio["\xa9alb"] = [album_title] if track_num: audio["trkn"] = [(track_num, 0)] if disc_num: audio["disk"] = [(disc_num, 0)] if copyright_text: audio["cprt"] = [copyright_text] if year: audio["\xa9day"] = [year] if cover_data: audio["covr"] = [MP4Cover(cover_data, imageformat=MP4Cover.FORMAT_JPEG)] audio.save() log("[+] Metadata embedded in M4A") except Exception as e: log(f"[!] Failed to embed metadata: {e}") def convert_to_mp3(input_path, output_path, bitrate="320k", log=None): """Convert audio file to MP3 using ffmpeg.""" if log is None: log = print ffmpeg = shutil.which("ffmpeg") if not ffmpeg: log("[!] ffmpeg not found in PATH. Install ffmpeg to enable MP3 conversion.") log(f"[*] Keeping FLAC file: {input_path}") return False log(f"[*] Converting to MP3 ({bitrate}) ...") try: subprocess.run( [ffmpeg, "-i", input_path, "-ab", bitrate, "-map_metadata", "0", "-y", output_path], check=True, capture_output=True, ) os.remove(input_path) log(f"[+] Converted to {output_path}") return True except subprocess.CalledProcessError as e: log(f"[!] ffmpeg conversion failed: {e.stderr.decode()[:200]}") log(f"[*] Keeping FLAC file: {input_path}") return False def main(): parser = argparse.ArgumentParser(description="Download a song from Monochrome") parser.add_argument("track_id", help="Track ID (from the URL, e.g. 12345678)") parser.add_argument("--quality", default="HI_RES_LOSSLESS", choices=["HI_RES_LOSSLESS", "LOSSLESS", "HIGH", "LOW", "MP3_320"], help="Audio quality (default: HI_RES_LOSSLESS)") parser.add_argument("--output", "-o", help="Output filename (auto-detected if omitted)") args = parser.parse_args() want_mp3 = args.quality == "MP3_320" # MP3_320 isn't a real API quality — download as LOSSLESS then convert api_quality = "LOSSLESS" if want_mp3 else args.quality instances = discover_instances() # Try to get track metadata for a nice filename print(f"[*] Fetching metadata for track {args.track_id} ...") info = get_track_info(instances, args.track_id) if info: title = info.get("title", args.track_id) artist = info.get("artist", {}) if isinstance(artist, dict): artist = artist.get("name", "Unknown") print(f"[*] Track: {artist} - {title}") else: print("[*] Could not fetch metadata (will use track ID for filename)") # Get the stream URL — try Tidal instances first, then Qobuz print(f"[*] Requesting stream (quality={api_quality}) ...") stream_url, track_data = get_stream_url_tidal(instances, args.track_id, api_quality) if not stream_url: print("[*] Tidal instances failed, trying Qobuz ...") stream_url = get_stream_url_qobuz(args.track_id, api_quality) if not stream_url: print("[!] Could not get a stream URL from any source.") sys.exit(1) # Merge metadata from track_data if we didn't get it earlier if not info and track_data and isinstance(track_data, dict): info = track_data # Build filename helper def make_filename(ext): if args.output: return args.output if info and info.get("title"): t = info.get("title", args.track_id) a = info.get("artist", {}) if isinstance(a, dict): a = a.get("name", "Unknown") elif not isinstance(a, str): a = "Unknown" return sanitize_filename(f"{a} - {t}{ext}") return f"{args.track_id}{ext}" # Determine extensions based on quality if want_mp3: flac_path = make_filename(".flac") mp3_path = make_filename(".mp3") elif api_quality in ("HIGH", "LOW"): flac_path = make_filename(".m4a") mp3_path = None else: flac_path = make_filename(".flac") mp3_path = None # Fetch cover art cover_data = None if info: cover_data = fetch_cover_art(info.get("album")) # Download print(f"[*] Stream URL: {stream_url[:100]}...") download_file(stream_url, flac_path) # Embed metadata into the downloaded file embed_metadata(flac_path, info, cover_data) # Convert to MP3 if requested if want_mp3: convert_to_mp3(flac_path, mp3_path) # Re-embed metadata into MP3 (ffmpeg may not carry everything over) embed_metadata(mp3_path, info, cover_data) if __name__ == "__main__": # Allow running as standalone script sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) main()