diff --git a/main.py b/main.py index 94374f1..0e91cfa 100644 --- a/main.py +++ b/main.py @@ -1,118 +1,168 @@ -import os,json,base64,requests,time,subprocess,sys,concurrent.futures -from mutagen.easyid3 import EasyID3;from mutagen.mp3 import MP3 -from mutagen.id3 import ID3,APIC,error +import os, json, base64, requests, time, subprocess, sys, concurrent.futures +from mutagen.easyid3 import EasyID3 +from mutagen.mp3 import MP3 +from mutagen.id3 import ID3, APIC, error -CONFIG_FILE="config.json";DOWNLOAD_DIR="downloads" -SPOTIFY_TRACK_API="https://api.spotify.com/v1/tracks/{}" -SPOTIFY_PLAYLIST_API="https://api.spotify.com/v1/playlists/{}/tracks" -INVIDIOUS_API=None +CONFIG_FILE = "config.json" +DOWNLOAD_DIR = "downloads" +SPOTIFY_TRACK_API = "https://api.spotify.com/v1/tracks/{}" +SPOTIFY_PLAYLIST_API = "https://api.spotify.com/v1/playlists/{}/tracks" +INVIDIOUS_API = None -def load_config(): # opens the config file - with open(CONFIG_FILE) as f:return json.load(f) +def load_config(): # opens the config file + with open(CONFIG_FILE) as f: + return json.load(f) def ensure_valid_token(): - global INVIDIOUS_API - c=load_config() - INVIDIOUS_API=c.get("invidious_url","https://invidious.snopyta.org/api/v1/search?type=video&q={query}") - if "access_token"not in c or int(time.time())>=c.get("expires_at",0): - print("no token running oauth") - subprocess.run(["python3","spotify_oauth.py"],check=True) - c=load_config() - return c["access_token"] + global INVIDIOUS_API + c = load_config() + INVIDIOUS_API = c.get("invidious_url", "https://invidious.snopyta.org/api/v1/search?type=video&q={query}") + if "access_token" not in c or int(time.time()) >= c.get("expires_at", 0): + print("no token running oauth") + subprocess.run(["python3", "spotify_oauth.py"], check=True) + c = load_config() + return c["access_token"] -def get_liked_tracks(tok): # gets liked songs i guess - l,r=[],0;h={"Authorization":f"Bearer {tok}"} - while 1: - u=f"https://api.spotify.com/v1/me/tracks?limit=50&offset={r}" - d=requests.get(u,headers=h) - if d.status_code==401:print("nah bro token's dead");break - j=d.json();i=j.get("items",[]) - for it in i:l.append(it["track"]["id"]) - if len(i)<50:break - r+=50;time.sleep(0.5) - return l +def get_liked_tracks(tok): # gets liked songs i guess + l, r = [], 0 + h = {"Authorization": f"Bearer {tok}"} + while 1: + u = f"https://api.spotify.com/v1/me/tracks?limit=50&offset={r}" + d = requests.get(u, headers=h) + if d.status_code == 401: + print("nah bro token's dead") + break + j = d.json() + i = j.get("items", []) + for it in i: + l.append(it["track"]["id"]) + if len(i) < 50: + break + r += 50 + time.sleep(0.5) + return l -def get_playlist_tracks(tok,pid): # does same thing but like for playlists - l,r=[],0;h={"Authorization":f"Bearer {tok}"} - while 1: - u=f"{SPOTIFY_PLAYLIST_API.format(pid)}?limit=100&offset={r}" - d=requests.get(u,headers=h) - if d.status_code==401:print("nope, bad token");break - if d.status_code==404:print("what playlist");break - j=d.json();i=j.get("items",[]) - for t in i: - tr=t.get("track") - if tr and tr.get("id"):l.append(tr["id"]) - if len(i)<100:break - r+=100;time.sleep(0.5) - return l +def get_playlist_tracks(tok, pid): # does same thing but like for playlists + l, r = [], 0 + h = {"Authorization": f"Bearer {tok}"} + while 1: + u = f"{SPOTIFY_PLAYLIST_API.format(pid)}?limit=100&offset={r}" + d = requests.get(u, headers=h) + if d.status_code == 401: + print("nope, bad token") + break + if d.status_code == 404: + print("what playlist") + break + j = d.json() + i = j.get("items", []) + for t in i: + tr = t.get("track") + if tr and tr.get("id"): + l.append(tr["id"]) + if len(i) < 100: + break + r += 100 + time.sleep(0.5) + return l -def fetch_metadata(tid,tok): # gets info about a track - return requests.get(SPOTIFY_TRACK_API.format(tid),headers={"Authorization":f"Bearer {tok}"}).json() +def fetch_metadata(tid, tok): # gets info about a track + return requests.get(SPOTIFY_TRACK_API.format(tid), headers={"Authorization": f"Bearer {tok}"}).json() -def sanitize_filename(n): # removes stuff that windows would cry about - return"".join(c for c in n if c not in r'\/:*?"<>|').strip() +def sanitize_filename(n): # removes stuff that windows would cry about + return "".join(c for c in n if c not in r'\/:*?"<>|').strip() -def search_invidious(q): # looks up stuff on invidious, yeah - try: - r=requests.get(INVIDIOUS_API.format(query=q),timeout=10);r.raise_for_status() - for v in r.json(): - if v.get("videoId"):return f"https://www.youtube.com/watch?v={v['videoId']}" - except:print("couldn't search :(");return None +def search_invidious(q): # looks up stuff on invidious + try: + r = requests.get(INVIDIOUS_API.format(query=q), timeout=10) + r.raise_for_status() + for v in r.json(): + if v.get("videoId"): + return f"https://www.youtube.com/watch?v={v['videoId']}" + except: + print("couldn't search :(") + return None -def download_from_youtube(url,f): # yoink an mp3 - os.makedirs(DOWNLOAD_DIR,exist_ok=True) - p=os.path.join(DOWNLOAD_DIR,f) - if os.path.exists(p):print(f"{f}? already here bro");return True - c=[];copt=["--cookies","cookies.txt"] if os.path.exists("cookies.txt") else [] - cmd=["yt-dlp",*copt,"-x","--audio-format","mp3","--no-playlist","--no-live-from-start","-o",p,url] - try: - subprocess.run(cmd,check=True);print(f"yanked {f}");return True - except:print(f"nope, didn't work: {f}");return False +def download_from_youtube(url, f): # yoink an mp3 + os.makedirs(DOWNLOAD_DIR, exist_ok=True) + p = os.path.join(DOWNLOAD_DIR, f) + if os.path.exists(p): + print(f"{f}? already here bro") + return True + copt = ["--cookies", "cookies.txt"] if os.path.exists("cookies.txt") else [] + cmd = ["yt-dlp", *copt, "-x", "--audio-format", "mp3", "--no-playlist", "--no-live-from-start", "-o", p, url] + try: + subprocess.run(cmd, check=True) + print(f"yanked {f}") + return True + except: + print(f"nope, didn't work: {f}") + return False -def embed_album_art(path,img): # slap on some pic - try: - i=requests.get(img).content - a=MP3(path,ID3=ID3) - try:a.add_tags() - except:error() - a.tags.add(APIC(encoding=3,mime='image/jpeg',type=3,desc='Cover',data=i)) - a.save();print(f"art slapped on {path}") - except Exception as e:print(f"eh: {e}") +def embed_album_art(path, img): # slap on some pic + try: + i = requests.get(img).content + a = MP3(path, ID3=ID3) + try: + a.add_tags() + except error: + pass + a.tags.add(APIC(encoding=3, mime='image/jpeg', type=3, desc='Cover', data=i)) + a.save() + print(f"art slapped on {path}") + except Exception as e: + print(f"eh: {e}") -def tag_mp3_with_spotify_metadata(p,m): # add title n stuff - try: - a=MP3(p,ID3=EasyID3) - a["title"]=m.get("name","") - a["artist"]=", ".join(z["name"]for z in m.get("artists",[])) - a["album"]=m.get("album",{}).get("name","") - a["date"]=m.get("album",{}).get("release_date","") - a.save();print(f"tagged: {p}") - i=m.get("album",{}).get("images",[]) - if i:embed_album_art(p,i[0]["url"]) - except Exception as e:print(f"tag fail on {p}: {e}") +def tag_mp3_with_spotify_metadata(p, m): # add title n stuff + try: + a = MP3(p, ID3=EasyID3) + a["title"] = m.get("name", "") + a["artist"] = ", ".join(z["name"] for z in m.get("artists", [])) + a["album"] = m.get("album", {}).get("name", "") + a["date"] = m.get("album", {}).get("release_date", "") + a.save() + print(f"tagged: {p}") + i = m.get("album", {}).get("images", []) + if i: + embed_album_art(p, i[0]["url"]) + except Exception as e: + print(f"tag fail on {p}: {e}") -def process_track(tid,tok): # the whole shabang - m=fetch_metadata(tid,tok) - if not m:print(f"nah nothing for {tid}");return False - a=m["artists"][0]["name"];t=m["name"] - s=f"{a} - {t} audio";f=sanitize_filename(f"{a} - {t}.mp3") - u=search_invidious(s) - if not u:print(f"nothing on invidious for {s}");return False - if not download_from_youtube(u,f):print(f"skipping {f}");return False - print(f"yoinked {f}");tag_mp3_with_spotify_metadata(os.path.join(DOWNLOAD_DIR,f),m) - return True +def process_track(tid, tok): # the whole shabang + m = fetch_metadata(tid, tok) + if not m: + print(f"nah nothing for {tid}") + return False + a = m["artists"][0]["name"] + t = m["name"] + s = f"{a} - {t} audio" + f = sanitize_filename(f"{a} - {t}.mp3") + u = search_invidious(s) + if not u: + print(f"nothing on invidious for {s}") + return False + if not download_from_youtube(u, f): + print(f"skipping {f}") + return False + print(f"yoinked {f}") + tag_mp3_with_spotify_metadata(os.path.join(DOWNLOAD_DIR, f), m) + return True -if __name__=="__main__": - p=sys.argv[1]if len(sys.argv)>1 else None - c=load_config();t=ensure_valid_token() - if p: - print(f"playlist: {p}");ids=get_playlist_tracks(t,p) - else: - print("liked songs");ids=get_liked_tracks(t) - print(f"{len(ids)} songs found, whew") - with concurrent.futures.ThreadPoolExecutor(max_workers=8)as x: - fut={x.submit(process_track,i,t):i for i in ids} - for f in concurrent.futures.as_completed(fut): - try:f.result() - except Exception as e:print(f"welp: {e}") +if __name__ == "__main__": + p = sys.argv[1] if len(sys.argv) > 1 else None + c = load_config() + t = ensure_valid_token() + if p: + print(f"playlist: {p}") + ids = get_playlist_tracks(t, p) + else: + print("liked songs") + ids = get_liked_tracks(t) + print(f"{len(ids)} songs found, whew") + with concurrent.futures.ThreadPoolExecutor(max_workers=8) as x: + fut = {x.submit(process_track, i, t): i for i in ids} + for f in concurrent.futures.as_completed(fut): + try: + f.result() + except Exception as e: + print(f"welp: {e}")