feat: implemented unified downloading
This commit is contained in:
@@ -6,7 +6,9 @@ for use from app.py background threads.
|
||||
"""
|
||||
|
||||
import os
|
||||
import shutil
|
||||
import time
|
||||
from pathlib import Path
|
||||
|
||||
from monochrome import discover_instances
|
||||
from monochrome.spotify_to_ids import (
|
||||
@@ -16,6 +18,8 @@ from monochrome.spotify_to_ids import (
|
||||
extract_tracks,
|
||||
search_monochrome,
|
||||
find_best_match,
|
||||
similarity,
|
||||
normalize,
|
||||
)
|
||||
from monochrome.download import (
|
||||
get_stream_url_tidal,
|
||||
@@ -23,9 +27,59 @@ from monochrome.download import (
|
||||
download_file,
|
||||
fetch_cover_art,
|
||||
embed_metadata,
|
||||
sanitize_filename,
|
||||
convert_to_mp3,
|
||||
)
|
||||
from utils import sanitize_filename, rename_from_metadata
|
||||
|
||||
|
||||
def verify_match(spotify_track, tidal_info, search_match, log):
|
||||
"""Cross-reference Spotify metadata against Tidal track data.
|
||||
Falls back to search_match fields when tidal_info has empty values.
|
||||
Returns True if the track is a valid match."""
|
||||
sp_title = spotify_track.get("title", "")
|
||||
|
||||
# Prefer tidal_info title, fall back to search match title
|
||||
ti_title = tidal_info.get("title", "") if tidal_info else ""
|
||||
if not ti_title:
|
||||
ti_title = search_match.get("title", "") if search_match else ""
|
||||
if sp_title and ti_title:
|
||||
title_sim = similarity(sp_title, ti_title)
|
||||
if title_sim < 0.5:
|
||||
log(f"[monochrome] Title mismatch: '{sp_title}' vs '{ti_title}' (sim={title_sim:.2f})")
|
||||
return False
|
||||
|
||||
sp_artist = spotify_track.get("artist", "")
|
||||
|
||||
# Prefer tidal_info artist, fall back to search match artist
|
||||
ti_artist = ""
|
||||
for src in (tidal_info, search_match):
|
||||
if not src:
|
||||
continue
|
||||
artist_obj = src.get("artist", {})
|
||||
ti_artist = artist_obj.get("name", "") if isinstance(artist_obj, dict) else str(artist_obj)
|
||||
if ti_artist:
|
||||
break
|
||||
if sp_artist and ti_artist:
|
||||
artist_sim = similarity(sp_artist, ti_artist)
|
||||
# For multi-artist tracks, Tidal may only return the primary artist.
|
||||
# Check if Tidal artist tokens are a subset of Spotify artist tokens.
|
||||
sp_tokens = set(normalize(sp_artist).split())
|
||||
ti_tokens = set(normalize(ti_artist).split())
|
||||
is_subset = ti_tokens and ti_tokens.issubset(sp_tokens)
|
||||
if artist_sim < 0.4 and not is_subset:
|
||||
log(f"[monochrome] Artist mismatch: '{sp_artist}' vs '{ti_artist}' (sim={artist_sim:.2f})")
|
||||
return False
|
||||
|
||||
# Duration check (strongest signal)
|
||||
sp_dur = spotify_track.get("duration") # milliseconds
|
||||
ti_dur = (tidal_info or {}).get("duration") # seconds
|
||||
if sp_dur and ti_dur:
|
||||
sp_seconds = sp_dur / 1000
|
||||
if abs(sp_seconds - ti_dur) > 5:
|
||||
log(f"[monochrome] Duration mismatch: {sp_seconds:.0f}s vs {ti_dur}s")
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
|
||||
def download_spotify_url(spotify_url, quality, output_dir, log=None, cancel_check=None):
|
||||
@@ -39,7 +93,8 @@ def download_spotify_url(spotify_url, quality, output_dir, log=None, cancel_chec
|
||||
cancel_check: Callback () -> bool, returns True if cancelled
|
||||
|
||||
Returns:
|
||||
(success_count, total_tracks)
|
||||
(success_count, total_tracks, fail_info) where fail_info is:
|
||||
{"failed_urls": [...], "subfolder": "name" or None}
|
||||
"""
|
||||
if log is None:
|
||||
log = print
|
||||
@@ -54,10 +109,12 @@ def download_spotify_url(spotify_url, quality, output_dir, log=None, cancel_chec
|
||||
instances = discover_instances(log=log)
|
||||
|
||||
# Step 2: Parse Spotify URL
|
||||
fail_info = {"failed_urls": [], "subfolder": None}
|
||||
|
||||
sp_type, sp_id = parse_spotify_url(spotify_url)
|
||||
if not sp_type:
|
||||
log(f"[monochrome] Invalid Spotify URL: {spotify_url}")
|
||||
return 0, 0
|
||||
return 0, 0, fail_info
|
||||
|
||||
# Step 3: Fetch track list from Spotify
|
||||
log(f"[monochrome] Fetching Spotify {sp_type}: {sp_id}")
|
||||
@@ -66,25 +123,29 @@ def download_spotify_url(spotify_url, quality, output_dir, log=None, cancel_chec
|
||||
|
||||
if not tracks:
|
||||
log(f"[monochrome] Could not extract tracks from {spotify_url}")
|
||||
return 0, 0
|
||||
return 0, 0, fail_info
|
||||
|
||||
total = len(tracks)
|
||||
log(f"[monochrome] Found {total} track(s) on Spotify")
|
||||
|
||||
# Create subfolder for albums/playlists
|
||||
dl_dir = output_dir
|
||||
subfolder_name = None
|
||||
if total > 1:
|
||||
collection_name = extract_collection_name(embed_data, sp_type)
|
||||
if collection_name:
|
||||
folder_name = sanitize_filename(collection_name)
|
||||
subfolder_name = sanitize_filename(collection_name)
|
||||
else:
|
||||
folder_name = sanitize_filename(f"{sp_type}_{sp_id}")
|
||||
dl_dir = os.path.join(output_dir, folder_name)
|
||||
subfolder_name = sanitize_filename(f"{sp_type}_{sp_id}")
|
||||
dl_dir = os.path.join(output_dir, subfolder_name)
|
||||
os.makedirs(dl_dir, exist_ok=True)
|
||||
log(f"[monochrome] Saving to folder: {folder_name}")
|
||||
log(f"[monochrome] Saving to folder: {subfolder_name}")
|
||||
fail_info["subfolder"] = subfolder_name
|
||||
|
||||
success = 0
|
||||
failed_tracks = []
|
||||
failed_urls = []
|
||||
last_final_path = None
|
||||
|
||||
for i, track in enumerate(tracks):
|
||||
if cancel_check():
|
||||
@@ -101,6 +162,9 @@ def download_spotify_url(spotify_url, quality, output_dir, log=None, cancel_chec
|
||||
if not match:
|
||||
log(f"[monochrome] No match found for: {query}")
|
||||
failed_tracks.append(query)
|
||||
sp_id = track.get("sp_id")
|
||||
if sp_id:
|
||||
failed_urls.append(f"https://open.spotify.com/track/{sp_id}")
|
||||
if i < total - 1:
|
||||
time.sleep(0.5)
|
||||
continue
|
||||
@@ -121,6 +185,9 @@ def download_spotify_url(spotify_url, quality, output_dir, log=None, cancel_chec
|
||||
if not stream_url:
|
||||
log(f"[monochrome] Failed to get stream for: {query}")
|
||||
failed_tracks.append(query)
|
||||
sp_id = track.get("sp_id")
|
||||
if sp_id:
|
||||
failed_urls.append(f"https://open.spotify.com/track/{sp_id}")
|
||||
if i < total - 1:
|
||||
time.sleep(0.5)
|
||||
continue
|
||||
@@ -133,7 +200,17 @@ def download_spotify_url(spotify_url, quality, output_dir, log=None, cancel_chec
|
||||
if k not in info or not info[k]:
|
||||
info[k] = v
|
||||
|
||||
# Determine file extension and path
|
||||
# Verify match against Spotify metadata
|
||||
if not verify_match(track, track_data, match, log):
|
||||
failed_tracks.append(query)
|
||||
sp_id = track.get("sp_id")
|
||||
if sp_id:
|
||||
failed_urls.append(f"https://open.spotify.com/track/{sp_id}")
|
||||
if i < total - 1:
|
||||
time.sleep(0.5)
|
||||
continue
|
||||
|
||||
# Determine file extension and path (temp name until metadata rename)
|
||||
if want_mp3:
|
||||
ext = ".flac"
|
||||
elif api_quality in ("HIGH", "LOW"):
|
||||
@@ -150,6 +227,9 @@ def download_spotify_url(spotify_url, quality, output_dir, log=None, cancel_chec
|
||||
except Exception as e:
|
||||
log(f"[monochrome] Download failed for {query}: {e}")
|
||||
failed_tracks.append(query)
|
||||
sp_id = track.get("sp_id")
|
||||
if sp_id:
|
||||
failed_urls.append(f"https://open.spotify.com/track/{sp_id}")
|
||||
if i < total - 1:
|
||||
time.sleep(0.5)
|
||||
continue
|
||||
@@ -160,17 +240,32 @@ def download_spotify_url(spotify_url, quality, output_dir, log=None, cancel_chec
|
||||
|
||||
# Convert to MP3 if requested
|
||||
if want_mp3:
|
||||
mp3_filename = sanitize_filename(f"{m_artist} - {m_title}.mp3")
|
||||
mp3_path = os.path.join(dl_dir, mp3_filename)
|
||||
mp3_path = os.path.join(dl_dir, sanitize_filename(f"{m_artist} - {m_title}.mp3"))
|
||||
if convert_to_mp3(file_path, mp3_path, log=log):
|
||||
embed_metadata(mp3_path, info, cover_data, log=log)
|
||||
file_path = mp3_path
|
||||
|
||||
# Rename from embedded metadata for consistent naming
|
||||
final_path = rename_from_metadata(file_path)
|
||||
log(f"[monochrome] Saved: {final_path.name}")
|
||||
|
||||
success += 1
|
||||
last_final_path = final_path
|
||||
|
||||
# Rate limit between tracks
|
||||
if i < total - 1:
|
||||
time.sleep(0.5)
|
||||
|
||||
# Wrap single tracks in a Title - Artist folder
|
||||
if total == 1 and success == 1 and last_final_path:
|
||||
folder_name = sanitize_filename(last_final_path.stem)
|
||||
wrapper_dir = os.path.join(output_dir, folder_name)
|
||||
os.makedirs(wrapper_dir, exist_ok=True)
|
||||
shutil.move(str(last_final_path), os.path.join(wrapper_dir, last_final_path.name))
|
||||
subfolder_name = folder_name
|
||||
fail_info["subfolder"] = subfolder_name
|
||||
log(f"[monochrome] Saved to folder: {folder_name}")
|
||||
|
||||
# Summary
|
||||
if failed_tracks:
|
||||
log(f"[monochrome] Failed tracks ({len(failed_tracks)}):")
|
||||
@@ -178,4 +273,5 @@ def download_spotify_url(spotify_url, quality, output_dir, log=None, cancel_chec
|
||||
log(f"[monochrome] - {ft}")
|
||||
|
||||
log(f"[monochrome] Complete: {success}/{total} tracks downloaded")
|
||||
return success, total
|
||||
fail_info["failed_urls"] = failed_urls
|
||||
return success, total, fail_info
|
||||
|
||||
@@ -26,6 +26,7 @@ import subprocess
|
||||
import sys
|
||||
|
||||
from monochrome import fetch, fetch_json, discover_instances, SSL_CTX, QOBUZ_API
|
||||
from utils import sanitize_filename
|
||||
|
||||
|
||||
def extract_stream_url_from_manifest(manifest_b64, log=None):
|
||||
@@ -360,9 +361,6 @@ def embed_metadata(file_path, info, cover_data=None, log=None):
|
||||
log(f"[!] Failed to embed metadata: {e}")
|
||||
|
||||
|
||||
def sanitize_filename(name):
|
||||
return re.sub(r'[<>:"/\\|?*]', '_', name)
|
||||
|
||||
|
||||
def convert_to_mp3(input_path, output_path, bitrate="320k", log=None):
|
||||
"""Convert audio file to MP3 using ffmpeg."""
|
||||
|
||||
@@ -102,7 +102,7 @@ def extract_tracks(embed_data, sp_type, sp_id):
|
||||
else:
|
||||
artist = entity.get("subtitle", "")
|
||||
if title:
|
||||
return [{"title": title, "artist": artist}]
|
||||
return [{"title": title, "artist": artist, "sp_id": sp_id, "duration": entity.get("duration")}]
|
||||
|
||||
elif sp_type in ("album", "playlist"):
|
||||
track_list = entity.get("trackList", [])
|
||||
@@ -111,8 +111,15 @@ def extract_tracks(embed_data, sp_type, sp_id):
|
||||
for t in track_list:
|
||||
title = t.get("title", "")
|
||||
artist = t.get("subtitle", "")
|
||||
# Prefer uri (contains real Spotify track ID) over uid (internal hex UID)
|
||||
track_uid = None
|
||||
uri = t.get("uri", "")
|
||||
if uri.startswith("spotify:track:"):
|
||||
track_uid = uri.split(":")[-1]
|
||||
if not track_uid:
|
||||
track_uid = t.get("uid")
|
||||
if title:
|
||||
tracks.append({"title": title, "artist": artist})
|
||||
tracks.append({"title": title, "artist": artist, "sp_id": track_uid, "duration": t.get("duration")})
|
||||
if tracks:
|
||||
return tracks
|
||||
except (KeyError, TypeError, IndexError):
|
||||
@@ -123,7 +130,7 @@ def extract_tracks(embed_data, sp_type, sp_id):
|
||||
oembed_title = fetch_spotify_oembed(sp_type, sp_id)
|
||||
if oembed_title:
|
||||
print(f'[*] Using oEmbed fallback: "{oembed_title}"', file=sys.stderr)
|
||||
return [{"title": oembed_title, "artist": ""}]
|
||||
return [{"title": oembed_title, "artist": "", "sp_id": sp_id, "duration": None}]
|
||||
|
||||
return []
|
||||
|
||||
|
||||
Reference in New Issue
Block a user