Files
trackpull/monochrome/download.py

483 lines
17 KiB
Python

"""
Monochrome song downloader.
Usage:
python download.py <track_id> [--quality QUALITY] [--output FILENAME]
Track IDs can be found in the URL when viewing a track on the site, e.g.:
https://monochrome.app/track/12345678 -> track_id = 12345678
Quality options:
HI_RES_LOSSLESS (default, highest available)
LOSSLESS (16-bit/44.1kHz FLAC)
HIGH (AAC 320kbps)
LOW (AAC 96kbps)
MP3_320 (downloads as LOSSLESS, converts to MP3 via ffmpeg)
"""
import argparse
import base64
import json
import os
import random
import re
import shutil
import subprocess
import sys
from monochrome import fetch, fetch_json, discover_instances, SSL_CTX, QOBUZ_API
def extract_stream_url_from_manifest(manifest_b64, log=None):
"""Decode base64 manifest and extract stream URL (mirrors frontend logic)."""
if log is None:
log = print
try:
decoded = base64.b64decode(manifest_b64).decode("utf-8", errors="replace")
except Exception:
return None
# Type 1: DASH manifest XML — extract BaseURL
if "<MPD" in decoded:
match = re.search(r"<BaseURL>(https?://[^<]+)</BaseURL>", decoded)
if match:
log("[*] Extracted URL from DASH manifest")
return match.group(1)
log("[!] DASH manifest found but no BaseURL — DASH streaming not supported in CLI")
return None
# Type 2: JSON with urls array
try:
parsed = json.loads(decoded)
if isinstance(parsed, dict):
urls = parsed.get("urls")
if isinstance(urls, list) and urls:
log("[*] Extracted URL from JSON manifest")
return urls[0]
url = parsed.get("url")
if url:
return url
except (json.JSONDecodeError, ValueError):
pass
# Type 3: Raw URL in the decoded string
match = re.search(r"https?://[\w\-.~:?#\[@!$&'()*+,;=%/]+", decoded)
if match:
log("[*] Extracted URL from raw manifest")
return match.group(0)
return None
def get_stream_url_tidal(instances, track_id, quality, log=None):
"""Get stream URL from Tidal-based instances using /track/ endpoint."""
if log is None:
log = print
shuffled = list(instances)
random.shuffle(shuffled)
for base in shuffled:
url = f"{base}/track/?id={track_id}&quality={quality}"
log(f"[*] Trying {base} ...")
try:
with fetch(url) as resp:
status = resp.status
if status == 429:
log(" Rate limited, skipping")
continue
if status >= 400:
log(f" HTTP {status}")
continue
body = resp.read().decode()
data = json.loads(body)
# Unwrap {"version": "...", "data": {...}} envelope
if isinstance(data, dict) and "data" in data and "version" in data:
data = data["data"]
# Response can be an array [trackObj, streamInfo] or an object
if isinstance(data, list) and len(data) >= 2:
track_obj = data[0]
stream_info = data[1]
elif isinstance(data, dict):
track_obj = data
stream_info = data
else:
log(f" Unexpected response format: {type(data)}")
continue
# Check for OriginalTrackUrl first (direct URL, no decoding needed)
original_url = None
if isinstance(stream_info, dict):
original_url = stream_info.get("OriginalTrackUrl") or stream_info.get("originalTrackUrl")
if isinstance(data, dict):
original_url = original_url or data.get("originalTrackUrl") or data.get("OriginalTrackUrl")
if original_url:
log(f"[+] Got direct stream URL from {base}")
return original_url, track_obj
# Fall back to manifest decoding
manifest = None
if isinstance(stream_info, dict):
manifest = stream_info.get("manifest")
if isinstance(data, dict):
manifest = manifest or data.get("manifest")
if manifest:
stream_url = extract_stream_url_from_manifest(manifest, log=log)
if stream_url:
log(f"[+] Got stream URL from manifest ({base})")
return stream_url, track_obj
else:
log(" Could not extract URL from manifest")
continue
# Maybe the response itself contains a URL
if isinstance(data, dict):
for key in ("url", "streamUrl", "stream_url"):
if data.get(key):
log(f"[+] Got stream URL from {base}")
return data[key], track_obj
log(" No stream URL found in response")
if isinstance(data, dict):
log(f" Response keys: {list(data.keys())}")
elif isinstance(data, list):
for i, item in enumerate(data):
if isinstance(item, dict):
log(f" [{i}] keys: {list(item.keys())}")
except json.JSONDecodeError as e:
log(f" Invalid JSON: {e}")
except Exception as e:
log(f" Failed: {e}")
continue
return None, None
def get_stream_url_qobuz(track_id, quality, log=None):
"""Try Qobuz API as an alternative source."""
if log is None:
log = print
qobuz_quality_map = {
"MP3_320": "27",
"LOSSLESS": "7",
"HI_RES_LOSSLESS": "5",
}
q = qobuz_quality_map.get(quality, "5")
url = f"{QOBUZ_API}/download-music?track_id={track_id}&quality={q}"
log("[*] Trying Qobuz API ...")
try:
data = fetch_json(url)
if isinstance(data, dict):
if data.get("success") and isinstance(data.get("data"), dict):
stream_url = data["data"].get("url")
if stream_url:
log("[+] Got stream URL from Qobuz")
return stream_url
elif data.get("url"):
log("[+] Got stream URL from Qobuz")
return data["url"]
except Exception as e:
log(f" Qobuz failed: {e}")
return None
def get_track_info(instances, track_id, log=None):
"""Fetch track metadata for filename."""
if log is None:
log = print
shuffled = list(instances)
random.shuffle(shuffled)
for base in shuffled:
for endpoint in [f"/info/?id={track_id}", f"/track/?id={track_id}&quality=LOSSLESS"]:
try:
data = fetch_json(f"{base}{endpoint}")
# Unwrap version/data envelope
if isinstance(data, dict) and "data" in data and "version" in data:
data = data["data"]
if isinstance(data, list) and data:
data = data[0]
if isinstance(data, dict) and data.get("title"):
return data
except Exception:
continue
return None
def download_file(url, output_path, log=None):
"""Download a file with progress display."""
if log is None:
log = print
with fetch(url, timeout=120) as resp:
total = resp.headers.get("Content-Length")
total = int(total) if total else None
with open(output_path, "wb") as f:
downloaded = 0
last_pct = -1
while True:
chunk = resp.read(8192)
if not chunk:
break
f.write(chunk)
downloaded += len(chunk)
if total:
pct = int(downloaded / total * 100)
# Only log every 10% to avoid flooding
if pct // 10 > last_pct // 10:
mb = downloaded / (1024 * 1024)
total_mb = total / (1024 * 1024)
log(f"[*] Downloading: {mb:.1f}/{total_mb:.1f} MB ({pct}%)")
last_pct = pct
log(f"[+] Saved to {output_path}")
def fetch_cover_art(album_info, log=None):
"""Download album cover art from Tidal CDN. Returns JPEG bytes or None."""
if log is None:
log = print
if not album_info or not isinstance(album_info, dict):
return None
cover_id = album_info.get("cover")
if not cover_id:
return None
# Tidal CDN uses slashes instead of dashes in the cover UUID
formatted = cover_id.replace("-", "/")
url = f"https://resources.tidal.com/images/{formatted}/1280x1280.jpg"
log("[*] Fetching album art ...")
try:
with fetch(url) as resp:
return resp.read()
except Exception as e:
log(f"[!] Could not fetch cover art: {e}")
return None
def embed_metadata(file_path, info, cover_data=None, log=None):
"""Embed metadata and cover art into audio file using mutagen."""
if log is None:
log = print
if not info:
return
ext = os.path.splitext(file_path)[1].lower()
title = info.get("title", "")
artist_obj = info.get("artist", {})
artist_name = artist_obj.get("name", "") if isinstance(artist_obj, dict) else str(artist_obj)
# Join all artists for the artist tag
artists = info.get("artists", [])
all_artists = ", ".join(a.get("name", "") for a in artists if isinstance(a, dict)) if artists else artist_name
album_obj = info.get("album", {})
album_title = album_obj.get("title", "") if isinstance(album_obj, dict) else ""
track_num = info.get("trackNumber")
disc_num = info.get("volumeNumber")
copyright_text = info.get("copyright", "")
isrc = info.get("isrc", "")
release_date = info.get("streamStartDate", "")
year = release_date[:4] if release_date and len(release_date) >= 4 else ""
log("[*] Embedding metadata ...")
try:
if ext == ".flac":
from mutagen.flac import FLAC, Picture
audio = FLAC(file_path)
audio["title"] = title
audio["artist"] = all_artists
audio["album"] = album_title
if track_num:
audio["tracknumber"] = str(track_num)
if disc_num:
audio["discnumber"] = str(disc_num)
if copyright_text:
audio["copyright"] = copyright_text
if isrc:
audio["isrc"] = isrc
if year:
audio["date"] = year
if cover_data:
pic = Picture()
pic.type = 3 # front cover
pic.mime = "image/jpeg"
pic.data = cover_data
audio.clear_pictures()
audio.add_picture(pic)
audio.save()
log("[+] Metadata embedded in FLAC")
elif ext == ".mp3":
from mutagen.id3 import ID3, TIT2, TPE1, TALB, TRCK, TPOS, TCOP, TDRC, TSRC, APIC
try:
audio = ID3(file_path)
except Exception:
from mutagen.id3 import ID3NoHeaderError
audio = ID3()
audio.add(TIT2(encoding=3, text=title))
audio.add(TPE1(encoding=3, text=all_artists))
audio.add(TALB(encoding=3, text=album_title))
if track_num:
audio.add(TRCK(encoding=3, text=str(track_num)))
if disc_num:
audio.add(TPOS(encoding=3, text=str(disc_num)))
if copyright_text:
audio.add(TCOP(encoding=3, text=copyright_text))
if year:
audio.add(TDRC(encoding=3, text=year))
if isrc:
audio.add(TSRC(encoding=3, text=isrc))
if cover_data:
audio.add(APIC(encoding=3, mime="image/jpeg", type=3, desc="Cover", data=cover_data))
audio.save(file_path)
log("[+] Metadata embedded in MP3")
elif ext == ".m4a":
from mutagen.mp4 import MP4, MP4Cover
audio = MP4(file_path)
audio["\xa9nam"] = [title]
audio["\xa9ART"] = [all_artists]
audio["\xa9alb"] = [album_title]
if track_num:
audio["trkn"] = [(track_num, 0)]
if disc_num:
audio["disk"] = [(disc_num, 0)]
if copyright_text:
audio["cprt"] = [copyright_text]
if year:
audio["\xa9day"] = [year]
if cover_data:
audio["covr"] = [MP4Cover(cover_data, imageformat=MP4Cover.FORMAT_JPEG)]
audio.save()
log("[+] Metadata embedded in M4A")
except Exception as e:
log(f"[!] Failed to embed metadata: {e}")
def sanitize_filename(name):
return re.sub(r'[<>:"/\\|?*]', '_', name)
def convert_to_mp3(input_path, output_path, bitrate="320k", log=None):
"""Convert audio file to MP3 using ffmpeg."""
if log is None:
log = print
ffmpeg = shutil.which("ffmpeg")
if not ffmpeg:
log("[!] ffmpeg not found in PATH. Install ffmpeg to enable MP3 conversion.")
log(f"[*] Keeping FLAC file: {input_path}")
return False
log(f"[*] Converting to MP3 ({bitrate}) ...")
try:
subprocess.run(
[ffmpeg, "-i", input_path, "-ab", bitrate, "-map_metadata", "0", "-y", output_path],
check=True, capture_output=True,
)
os.remove(input_path)
log(f"[+] Converted to {output_path}")
return True
except subprocess.CalledProcessError as e:
log(f"[!] ffmpeg conversion failed: {e.stderr.decode()[:200]}")
log(f"[*] Keeping FLAC file: {input_path}")
return False
def main():
parser = argparse.ArgumentParser(description="Download a song from Monochrome")
parser.add_argument("track_id", help="Track ID (from the URL, e.g. 12345678)")
parser.add_argument("--quality", default="HI_RES_LOSSLESS",
choices=["HI_RES_LOSSLESS", "LOSSLESS", "HIGH", "LOW", "MP3_320"],
help="Audio quality (default: HI_RES_LOSSLESS)")
parser.add_argument("--output", "-o", help="Output filename (auto-detected if omitted)")
args = parser.parse_args()
want_mp3 = args.quality == "MP3_320"
# MP3_320 isn't a real API quality — download as LOSSLESS then convert
api_quality = "LOSSLESS" if want_mp3 else args.quality
instances = discover_instances()
# Try to get track metadata for a nice filename
print(f"[*] Fetching metadata for track {args.track_id} ...")
info = get_track_info(instances, args.track_id)
if info:
title = info.get("title", args.track_id)
artist = info.get("artist", {})
if isinstance(artist, dict):
artist = artist.get("name", "Unknown")
print(f"[*] Track: {artist} - {title}")
else:
print("[*] Could not fetch metadata (will use track ID for filename)")
# Get the stream URL — try Tidal instances first, then Qobuz
print(f"[*] Requesting stream (quality={api_quality}) ...")
stream_url, track_data = get_stream_url_tidal(instances, args.track_id, api_quality)
if not stream_url:
print("[*] Tidal instances failed, trying Qobuz ...")
stream_url = get_stream_url_qobuz(args.track_id, api_quality)
if not stream_url:
print("[!] Could not get a stream URL from any source.")
sys.exit(1)
# Merge metadata from track_data if we didn't get it earlier
if not info and track_data and isinstance(track_data, dict):
info = track_data
# Build filename helper
def make_filename(ext):
if args.output:
return args.output
if info and info.get("title"):
t = info.get("title", args.track_id)
a = info.get("artist", {})
if isinstance(a, dict):
a = a.get("name", "Unknown")
elif not isinstance(a, str):
a = "Unknown"
return sanitize_filename(f"{a} - {t}{ext}")
return f"{args.track_id}{ext}"
# Determine extensions based on quality
if want_mp3:
flac_path = make_filename(".flac")
mp3_path = make_filename(".mp3")
elif api_quality in ("HIGH", "LOW"):
flac_path = make_filename(".m4a")
mp3_path = None
else:
flac_path = make_filename(".flac")
mp3_path = None
# Fetch cover art
cover_data = None
if info:
cover_data = fetch_cover_art(info.get("album"))
# Download
print(f"[*] Stream URL: {stream_url[:100]}...")
download_file(stream_url, flac_path)
# Embed metadata into the downloaded file
embed_metadata(flac_path, info, cover_data)
# Convert to MP3 if requested
if want_mp3:
convert_to_mp3(flac_path, mp3_path)
# Re-embed metadata into MP3 (ffmpeg may not carry everything over)
embed_metadata(mp3_path, info, cover_data)
if __name__ == "__main__":
# Allow running as standalone script
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
main()