481 lines
17 KiB
Python
481 lines
17 KiB
Python
"""
|
|
Monochrome song downloader.
|
|
|
|
Usage:
|
|
python download.py <track_id> [--quality QUALITY] [--output FILENAME]
|
|
|
|
Track IDs can be found in the URL when viewing a track on the site, e.g.:
|
|
https://monochrome.app/track/12345678 -> track_id = 12345678
|
|
|
|
Quality options:
|
|
HI_RES_LOSSLESS (default, highest available)
|
|
LOSSLESS (16-bit/44.1kHz FLAC)
|
|
HIGH (AAC 320kbps)
|
|
LOW (AAC 96kbps)
|
|
MP3_320 (downloads as LOSSLESS, converts to MP3 via ffmpeg)
|
|
"""
|
|
|
|
import argparse
|
|
import base64
|
|
import json
|
|
import os
|
|
import random
|
|
import re
|
|
import shutil
|
|
import subprocess
|
|
import sys
|
|
|
|
from monochrome import fetch, fetch_json, discover_instances, SSL_CTX, QOBUZ_API
|
|
from utils import sanitize_filename
|
|
|
|
|
|
def extract_stream_url_from_manifest(manifest_b64, log=None):
|
|
"""Decode base64 manifest and extract stream URL (mirrors frontend logic)."""
|
|
if log is None:
|
|
log = print
|
|
try:
|
|
decoded = base64.b64decode(manifest_b64).decode("utf-8", errors="replace")
|
|
except Exception:
|
|
return None
|
|
|
|
# Type 1: DASH manifest XML — extract BaseURL
|
|
if "<MPD" in decoded:
|
|
match = re.search(r"<BaseURL>(https?://[^<]+)</BaseURL>", decoded)
|
|
if match:
|
|
log("[*] Extracted URL from DASH manifest")
|
|
return match.group(1)
|
|
log("[!] DASH manifest found but no BaseURL — DASH streaming not supported in CLI")
|
|
return None
|
|
|
|
# Type 2: JSON with urls array
|
|
try:
|
|
parsed = json.loads(decoded)
|
|
if isinstance(parsed, dict):
|
|
urls = parsed.get("urls")
|
|
if isinstance(urls, list) and urls:
|
|
log("[*] Extracted URL from JSON manifest")
|
|
return urls[0]
|
|
url = parsed.get("url")
|
|
if url:
|
|
return url
|
|
except (json.JSONDecodeError, ValueError):
|
|
pass
|
|
|
|
# Type 3: Raw URL in the decoded string
|
|
match = re.search(r"https?://[\w\-.~:?#\[@!$&'()*+,;=%/]+", decoded)
|
|
if match:
|
|
log("[*] Extracted URL from raw manifest")
|
|
return match.group(0)
|
|
|
|
return None
|
|
|
|
|
|
def get_stream_url_tidal(instances, track_id, quality, log=None):
|
|
"""Get stream URL from Tidal-based instances using /track/ endpoint."""
|
|
if log is None:
|
|
log = print
|
|
shuffled = list(instances)
|
|
random.shuffle(shuffled)
|
|
|
|
for base in shuffled:
|
|
url = f"{base}/track/?id={track_id}&quality={quality}"
|
|
log(f"[*] Trying {base} ...")
|
|
try:
|
|
with fetch(url) as resp:
|
|
status = resp.status
|
|
if status == 429:
|
|
log(" Rate limited, skipping")
|
|
continue
|
|
if status >= 400:
|
|
log(f" HTTP {status}")
|
|
continue
|
|
|
|
body = resp.read().decode()
|
|
data = json.loads(body)
|
|
|
|
# Unwrap {"version": "...", "data": {...}} envelope
|
|
if isinstance(data, dict) and "data" in data and "version" in data:
|
|
data = data["data"]
|
|
|
|
# Response can be an array [trackObj, streamInfo] or an object
|
|
if isinstance(data, list) and len(data) >= 2:
|
|
track_obj = data[0]
|
|
stream_info = data[1]
|
|
elif isinstance(data, dict):
|
|
track_obj = data
|
|
stream_info = data
|
|
else:
|
|
log(f" Unexpected response format: {type(data)}")
|
|
continue
|
|
|
|
# Check for OriginalTrackUrl first (direct URL, no decoding needed)
|
|
original_url = None
|
|
if isinstance(stream_info, dict):
|
|
original_url = stream_info.get("OriginalTrackUrl") or stream_info.get("originalTrackUrl")
|
|
if isinstance(data, dict):
|
|
original_url = original_url or data.get("originalTrackUrl") or data.get("OriginalTrackUrl")
|
|
|
|
if original_url:
|
|
log(f"[+] Got direct stream URL from {base}")
|
|
return original_url, track_obj
|
|
|
|
# Fall back to manifest decoding
|
|
manifest = None
|
|
if isinstance(stream_info, dict):
|
|
manifest = stream_info.get("manifest")
|
|
if isinstance(data, dict):
|
|
manifest = manifest or data.get("manifest")
|
|
|
|
if manifest:
|
|
stream_url = extract_stream_url_from_manifest(manifest, log=log)
|
|
if stream_url:
|
|
log(f"[+] Got stream URL from manifest ({base})")
|
|
return stream_url, track_obj
|
|
else:
|
|
log(" Could not extract URL from manifest")
|
|
continue
|
|
|
|
# Maybe the response itself contains a URL
|
|
if isinstance(data, dict):
|
|
for key in ("url", "streamUrl", "stream_url"):
|
|
if data.get(key):
|
|
log(f"[+] Got stream URL from {base}")
|
|
return data[key], track_obj
|
|
|
|
log(" No stream URL found in response")
|
|
if isinstance(data, dict):
|
|
log(f" Response keys: {list(data.keys())}")
|
|
elif isinstance(data, list):
|
|
for i, item in enumerate(data):
|
|
if isinstance(item, dict):
|
|
log(f" [{i}] keys: {list(item.keys())}")
|
|
|
|
except json.JSONDecodeError as e:
|
|
log(f" Invalid JSON: {e}")
|
|
except Exception as e:
|
|
log(f" Failed: {e}")
|
|
continue
|
|
|
|
return None, None
|
|
|
|
|
|
def get_stream_url_qobuz(track_id, quality, log=None):
|
|
"""Try Qobuz API as an alternative source."""
|
|
if log is None:
|
|
log = print
|
|
qobuz_quality_map = {
|
|
"MP3_320": "27",
|
|
"LOSSLESS": "7",
|
|
"HI_RES_LOSSLESS": "5",
|
|
}
|
|
q = qobuz_quality_map.get(quality, "5")
|
|
|
|
url = f"{QOBUZ_API}/download-music?track_id={track_id}&quality={q}"
|
|
log("[*] Trying Qobuz API ...")
|
|
try:
|
|
data = fetch_json(url)
|
|
if isinstance(data, dict):
|
|
if data.get("success") and isinstance(data.get("data"), dict):
|
|
stream_url = data["data"].get("url")
|
|
if stream_url:
|
|
log("[+] Got stream URL from Qobuz")
|
|
return stream_url
|
|
elif data.get("url"):
|
|
log("[+] Got stream URL from Qobuz")
|
|
return data["url"]
|
|
except Exception as e:
|
|
log(f" Qobuz failed: {e}")
|
|
|
|
return None
|
|
|
|
|
|
def get_track_info(instances, track_id, log=None):
|
|
"""Fetch track metadata for filename."""
|
|
if log is None:
|
|
log = print
|
|
shuffled = list(instances)
|
|
random.shuffle(shuffled)
|
|
for base in shuffled:
|
|
for endpoint in [f"/info/?id={track_id}", f"/track/?id={track_id}&quality=LOSSLESS"]:
|
|
try:
|
|
data = fetch_json(f"{base}{endpoint}")
|
|
# Unwrap version/data envelope
|
|
if isinstance(data, dict) and "data" in data and "version" in data:
|
|
data = data["data"]
|
|
if isinstance(data, list) and data:
|
|
data = data[0]
|
|
if isinstance(data, dict) and data.get("title"):
|
|
return data
|
|
except Exception:
|
|
continue
|
|
return None
|
|
|
|
|
|
def download_file(url, output_path, log=None):
|
|
"""Download a file with progress display."""
|
|
if log is None:
|
|
log = print
|
|
with fetch(url, timeout=120) as resp:
|
|
total = resp.headers.get("Content-Length")
|
|
total = int(total) if total else None
|
|
|
|
with open(output_path, "wb") as f:
|
|
downloaded = 0
|
|
last_pct = -1
|
|
while True:
|
|
chunk = resp.read(8192)
|
|
if not chunk:
|
|
break
|
|
f.write(chunk)
|
|
downloaded += len(chunk)
|
|
if total:
|
|
pct = int(downloaded / total * 100)
|
|
# Only log every 10% to avoid flooding
|
|
if pct // 10 > last_pct // 10:
|
|
mb = downloaded / (1024 * 1024)
|
|
total_mb = total / (1024 * 1024)
|
|
log(f"[*] Downloading: {mb:.1f}/{total_mb:.1f} MB ({pct}%)")
|
|
last_pct = pct
|
|
|
|
log(f"[+] Saved to {output_path}")
|
|
|
|
|
|
def fetch_cover_art(album_info, log=None):
|
|
"""Download album cover art from Tidal CDN. Returns JPEG bytes or None."""
|
|
if log is None:
|
|
log = print
|
|
if not album_info or not isinstance(album_info, dict):
|
|
return None
|
|
cover_id = album_info.get("cover")
|
|
if not cover_id:
|
|
return None
|
|
# Tidal CDN uses slashes instead of dashes in the cover UUID
|
|
formatted = cover_id.replace("-", "/")
|
|
url = f"https://resources.tidal.com/images/{formatted}/1280x1280.jpg"
|
|
log("[*] Fetching album art ...")
|
|
try:
|
|
with fetch(url) as resp:
|
|
return resp.read()
|
|
except Exception as e:
|
|
log(f"[!] Could not fetch cover art: {e}")
|
|
return None
|
|
|
|
|
|
def embed_metadata(file_path, info, cover_data=None, log=None):
|
|
"""Embed metadata and cover art into audio file using mutagen."""
|
|
if log is None:
|
|
log = print
|
|
if not info:
|
|
return
|
|
|
|
ext = os.path.splitext(file_path)[1].lower()
|
|
title = info.get("title", "")
|
|
artist_obj = info.get("artist", {})
|
|
artist_name = artist_obj.get("name", "") if isinstance(artist_obj, dict) else str(artist_obj)
|
|
# Join all artists for the artist tag
|
|
artists = info.get("artists", [])
|
|
all_artists = ", ".join(a.get("name", "") for a in artists if isinstance(a, dict)) if artists else artist_name
|
|
album_obj = info.get("album", {})
|
|
album_title = album_obj.get("title", "") if isinstance(album_obj, dict) else ""
|
|
track_num = info.get("trackNumber")
|
|
disc_num = info.get("volumeNumber")
|
|
copyright_text = info.get("copyright", "")
|
|
isrc = info.get("isrc", "")
|
|
release_date = info.get("streamStartDate", "")
|
|
year = release_date[:4] if release_date and len(release_date) >= 4 else ""
|
|
|
|
log("[*] Embedding metadata ...")
|
|
|
|
try:
|
|
if ext == ".flac":
|
|
from mutagen.flac import FLAC, Picture
|
|
audio = FLAC(file_path)
|
|
audio["title"] = title
|
|
audio["artist"] = all_artists
|
|
audio["album"] = album_title
|
|
if track_num:
|
|
audio["tracknumber"] = str(track_num)
|
|
if disc_num:
|
|
audio["discnumber"] = str(disc_num)
|
|
if copyright_text:
|
|
audio["copyright"] = copyright_text
|
|
if isrc:
|
|
audio["isrc"] = isrc
|
|
if year:
|
|
audio["date"] = year
|
|
if cover_data:
|
|
pic = Picture()
|
|
pic.type = 3 # front cover
|
|
pic.mime = "image/jpeg"
|
|
pic.data = cover_data
|
|
audio.clear_pictures()
|
|
audio.add_picture(pic)
|
|
audio.save()
|
|
log("[+] Metadata embedded in FLAC")
|
|
|
|
elif ext == ".mp3":
|
|
from mutagen.id3 import ID3, TIT2, TPE1, TALB, TRCK, TPOS, TCOP, TDRC, TSRC, APIC
|
|
try:
|
|
audio = ID3(file_path)
|
|
except Exception:
|
|
from mutagen.id3 import ID3NoHeaderError
|
|
audio = ID3()
|
|
audio.add(TIT2(encoding=3, text=title))
|
|
audio.add(TPE1(encoding=3, text=all_artists))
|
|
audio.add(TALB(encoding=3, text=album_title))
|
|
if track_num:
|
|
audio.add(TRCK(encoding=3, text=str(track_num)))
|
|
if disc_num:
|
|
audio.add(TPOS(encoding=3, text=str(disc_num)))
|
|
if copyright_text:
|
|
audio.add(TCOP(encoding=3, text=copyright_text))
|
|
if year:
|
|
audio.add(TDRC(encoding=3, text=year))
|
|
if isrc:
|
|
audio.add(TSRC(encoding=3, text=isrc))
|
|
if cover_data:
|
|
audio.add(APIC(encoding=3, mime="image/jpeg", type=3, desc="Cover", data=cover_data))
|
|
audio.save(file_path)
|
|
log("[+] Metadata embedded in MP3")
|
|
|
|
elif ext == ".m4a":
|
|
from mutagen.mp4 import MP4, MP4Cover
|
|
audio = MP4(file_path)
|
|
audio["\xa9nam"] = [title]
|
|
audio["\xa9ART"] = [all_artists]
|
|
audio["\xa9alb"] = [album_title]
|
|
if track_num:
|
|
audio["trkn"] = [(track_num, 0)]
|
|
if disc_num:
|
|
audio["disk"] = [(disc_num, 0)]
|
|
if copyright_text:
|
|
audio["cprt"] = [copyright_text]
|
|
if year:
|
|
audio["\xa9day"] = [year]
|
|
if cover_data:
|
|
audio["covr"] = [MP4Cover(cover_data, imageformat=MP4Cover.FORMAT_JPEG)]
|
|
audio.save()
|
|
log("[+] Metadata embedded in M4A")
|
|
|
|
except Exception as e:
|
|
log(f"[!] Failed to embed metadata: {e}")
|
|
|
|
|
|
|
|
def convert_to_mp3(input_path, output_path, bitrate="320k", log=None):
|
|
"""Convert audio file to MP3 using ffmpeg."""
|
|
if log is None:
|
|
log = print
|
|
ffmpeg = shutil.which("ffmpeg")
|
|
if not ffmpeg:
|
|
log("[!] ffmpeg not found in PATH. Install ffmpeg to enable MP3 conversion.")
|
|
log(f"[*] Keeping FLAC file: {input_path}")
|
|
return False
|
|
|
|
log(f"[*] Converting to MP3 ({bitrate}) ...")
|
|
try:
|
|
subprocess.run(
|
|
[ffmpeg, "-i", input_path, "-ab", bitrate, "-map_metadata", "0", "-y", output_path],
|
|
check=True, capture_output=True,
|
|
)
|
|
os.remove(input_path)
|
|
log(f"[+] Converted to {output_path}")
|
|
return True
|
|
except subprocess.CalledProcessError as e:
|
|
log(f"[!] ffmpeg conversion failed: {e.stderr.decode()[:200]}")
|
|
log(f"[*] Keeping FLAC file: {input_path}")
|
|
return False
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Download a song from Monochrome")
|
|
parser.add_argument("track_id", help="Track ID (from the URL, e.g. 12345678)")
|
|
parser.add_argument("--quality", default="HI_RES_LOSSLESS",
|
|
choices=["HI_RES_LOSSLESS", "LOSSLESS", "HIGH", "LOW", "MP3_320"],
|
|
help="Audio quality (default: HI_RES_LOSSLESS)")
|
|
parser.add_argument("--output", "-o", help="Output filename (auto-detected if omitted)")
|
|
args = parser.parse_args()
|
|
|
|
want_mp3 = args.quality == "MP3_320"
|
|
# MP3_320 isn't a real API quality — download as LOSSLESS then convert
|
|
api_quality = "LOSSLESS" if want_mp3 else args.quality
|
|
|
|
instances = discover_instances()
|
|
|
|
# Try to get track metadata for a nice filename
|
|
print(f"[*] Fetching metadata for track {args.track_id} ...")
|
|
info = get_track_info(instances, args.track_id)
|
|
if info:
|
|
title = info.get("title", args.track_id)
|
|
artist = info.get("artist", {})
|
|
if isinstance(artist, dict):
|
|
artist = artist.get("name", "Unknown")
|
|
print(f"[*] Track: {artist} - {title}")
|
|
else:
|
|
print("[*] Could not fetch metadata (will use track ID for filename)")
|
|
|
|
# Get the stream URL — try Tidal instances first, then Qobuz
|
|
print(f"[*] Requesting stream (quality={api_quality}) ...")
|
|
stream_url, track_data = get_stream_url_tidal(instances, args.track_id, api_quality)
|
|
|
|
if not stream_url:
|
|
print("[*] Tidal instances failed, trying Qobuz ...")
|
|
stream_url = get_stream_url_qobuz(args.track_id, api_quality)
|
|
|
|
if not stream_url:
|
|
print("[!] Could not get a stream URL from any source.")
|
|
sys.exit(1)
|
|
|
|
# Merge metadata from track_data if we didn't get it earlier
|
|
if not info and track_data and isinstance(track_data, dict):
|
|
info = track_data
|
|
|
|
# Build filename helper
|
|
def make_filename(ext):
|
|
if args.output:
|
|
return args.output
|
|
if info and info.get("title"):
|
|
t = info.get("title", args.track_id)
|
|
a = info.get("artist", {})
|
|
if isinstance(a, dict):
|
|
a = a.get("name", "Unknown")
|
|
elif not isinstance(a, str):
|
|
a = "Unknown"
|
|
return sanitize_filename(f"{a} - {t}{ext}")
|
|
return f"{args.track_id}{ext}"
|
|
|
|
# Determine extensions based on quality
|
|
if want_mp3:
|
|
flac_path = make_filename(".flac")
|
|
mp3_path = make_filename(".mp3")
|
|
elif api_quality in ("HIGH", "LOW"):
|
|
flac_path = make_filename(".m4a")
|
|
mp3_path = None
|
|
else:
|
|
flac_path = make_filename(".flac")
|
|
mp3_path = None
|
|
|
|
# Fetch cover art
|
|
cover_data = None
|
|
if info:
|
|
cover_data = fetch_cover_art(info.get("album"))
|
|
|
|
# Download
|
|
print(f"[*] Stream URL: {stream_url[:100]}...")
|
|
download_file(stream_url, flac_path)
|
|
|
|
# Embed metadata into the downloaded file
|
|
embed_metadata(flac_path, info, cover_data)
|
|
|
|
# Convert to MP3 if requested
|
|
if want_mp3:
|
|
convert_to_mp3(flac_path, mp3_path)
|
|
# Re-embed metadata into MP3 (ffmpeg may not carry everything over)
|
|
embed_metadata(mp3_path, info, cover_data)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# Allow running as standalone script
|
|
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
main()
|