import os, json, base64, requests, time, subprocess, sys, concurrent.futures from mutagen.easyid3 import EasyID3 from mutagen.mp3 import MP3 from mutagen.id3 import ID3, APIC, error CONFIG_FILE = "config.json" DOWNLOAD_DIR = "downloads" SPOTIFY_TRACK_API = "https://api.spotify.com/v1/tracks/{}" SPOTIFY_PLAYLIST_API = "https://api.spotify.com/v1/playlists/{}/tracks" INVIDIOUS_API = None def load_config(): # opens the config file with open(CONFIG_FILE) as f: return json.load(f) def ensure_valid_token(): global INVIDIOUS_API c = load_config() INVIDIOUS_API = c.get("invidious_url", "https://invidious.snopyta.org/api/v1/search?type=video&q={query}") if "access_token" not in c or int(time.time()) >= c.get("expires_at", 0): print("no token running oauth") subprocess.run(["python3", "spotify_oauth.py"], check=True) c = load_config() return c["access_token"] def get_liked_tracks(tok): # gets liked songs i guess l, r = [], 0 h = {"Authorization": f"Bearer {tok}"} while 1: u = f"https://api.spotify.com/v1/me/tracks?limit=50&offset={r}" d = requests.get(u, headers=h) if d.status_code == 401: print("nah bro token's dead") break j = d.json() i = j.get("items", []) for it in i: l.append(it["track"]["id"]) if len(i) < 50: break r += 50 time.sleep(0.5) return l def get_playlist_tracks(tok, pid): # does same thing but like for playlists l, r = [], 0 h = {"Authorization": f"Bearer {tok}"} while 1: u = f"{SPOTIFY_PLAYLIST_API.format(pid)}?limit=100&offset={r}" d = requests.get(u, headers=h) if d.status_code == 401: print("nope, bad token") break if d.status_code == 404: print("what playlist") break j = d.json() i = j.get("items", []) for t in i: tr = t.get("track") if tr and tr.get("id"): l.append(tr["id"]) if len(i) < 100: break r += 100 time.sleep(0.5) return l def fetch_metadata(tid, tok): # gets info about a track return requests.get(SPOTIFY_TRACK_API.format(tid), headers={"Authorization": f"Bearer {tok}"}).json() def sanitize_filename(n): # removes stuff that windows would cry about return "".join(c for c in n if c not in r'\/:*?"<>|').strip() def search_invidious(q): # looks up stuff on invidious try: r = requests.get(INVIDIOUS_API.format(query=q), timeout=10) r.raise_for_status() for v in r.json(): if v.get("videoId"): return f"https://www.youtube.com/watch?v={v['videoId']}" except: print("couldn't search :(") return None def download_from_youtube(url, f): # yoink an mp3 os.makedirs(DOWNLOAD_DIR, exist_ok=True) p = os.path.join(DOWNLOAD_DIR, f) if os.path.exists(p): print(f"{f}? already here bro") return True copt = ["--cookies", "cookies.txt"] if os.path.exists("cookies.txt") else [] cmd = ["yt-dlp", *copt, "-x", "--audio-format", "mp3", "--no-playlist", "--no-live-from-start", "-o", p, url] try: subprocess.run(cmd, check=True) print(f"yanked {f}") return True except: print(f"nope, didn't work: {f}") return False def embed_album_art(path, img): # slap on some pic try: i = requests.get(img).content a = MP3(path, ID3=ID3) try: a.add_tags() except error: pass a.tags.add(APIC(encoding=3, mime='image/jpeg', type=3, desc='Cover', data=i)) a.save() print(f"art slapped on {path}") except Exception as e: print(f"eh: {e}") def tag_mp3_with_spotify_metadata(p, m): # add title n stuff try: a = MP3(p, ID3=EasyID3) a["title"] = m.get("name", "") a["artist"] = ", ".join(z["name"] for z in m.get("artists", [])) a["album"] = m.get("album", {}).get("name", "") a["date"] = m.get("album", {}).get("release_date", "") a.save() print(f"tagged: {p}") i = m.get("album", {}).get("images", []) if i: embed_album_art(p, i[0]["url"]) except Exception as e: print(f"tag fail on {p}: {e}") def process_track(tid, tok): # the whole shabang m = fetch_metadata(tid, tok) if not m: print(f"nah nothing for {tid}") return False a = m["artists"][0]["name"] t = m["name"] s = f"{a} - {t} audio" f = sanitize_filename(f"{a} - {t}.mp3") u = search_invidious(s) if not u: print(f"nothing on invidious for {s}") return False if not download_from_youtube(u, f): print(f"skipping {f}") return False print(f"yoinked {f}") tag_mp3_with_spotify_metadata(os.path.join(DOWNLOAD_DIR, f), m) return True if __name__ == "__main__": p = sys.argv[1] if len(sys.argv) > 1 else None c = load_config() t = ensure_valid_token() if p: print(f"playlist: {p}") ids = get_playlist_tracks(t, p) else: print("liked songs") ids = get_liked_tracks(t) print(f"{len(ids)} songs found, whew") with concurrent.futures.ThreadPoolExecutor(max_workers=8) as x: fut = {x.submit(process_track, i, t): i for i in ids} for f in concurrent.futures.as_completed(fut): try: f.result() except Exception as e: print(f"welp: {e}")