commit 4a42531632586bcdd2516c1f0fdcaf95fc89629b Author: WhatDidYouExpect <89535984+WhatDidYouExpect@users.noreply.github.com> Date: Sun Aug 3 16:17:38 2025 +0200 sigh diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..0acf19d --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +config.json +downloads/ +yt-dlp \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..0b8f33f --- /dev/null +++ b/README.md @@ -0,0 +1,5 @@ +cant be assed to make this nice + +you have to have yt-dlp in your path + +and make a spotify app \ No newline at end of file diff --git a/configexample.json b/configexample.json new file mode 100644 index 0000000..3f84622 --- /dev/null +++ b/configexample.json @@ -0,0 +1,6 @@ +{ + "client_id": "", + "client_secret": "", + "redirect_uri": "http://localhost:8888/callback", + "invidious_url": "" +} \ No newline at end of file diff --git a/main.py b/main.py new file mode 100644 index 0000000..94374f1 --- /dev/null +++ b/main.py @@ -0,0 +1,118 @@ +import os,json,base64,requests,time,subprocess,sys,concurrent.futures +from mutagen.easyid3 import EasyID3;from mutagen.mp3 import MP3 +from mutagen.id3 import ID3,APIC,error + +CONFIG_FILE="config.json";DOWNLOAD_DIR="downloads" +SPOTIFY_TRACK_API="https://api.spotify.com/v1/tracks/{}" +SPOTIFY_PLAYLIST_API="https://api.spotify.com/v1/playlists/{}/tracks" +INVIDIOUS_API=None + +def load_config(): # opens the config file + with open(CONFIG_FILE) as f:return json.load(f) + +def ensure_valid_token(): + global INVIDIOUS_API + c=load_config() + INVIDIOUS_API=c.get("invidious_url","https://invidious.snopyta.org/api/v1/search?type=video&q={query}") + if "access_token"not in c or int(time.time())>=c.get("expires_at",0): + print("no token running oauth") + subprocess.run(["python3","spotify_oauth.py"],check=True) + c=load_config() + return c["access_token"] + +def get_liked_tracks(tok): # gets liked songs i guess + l,r=[],0;h={"Authorization":f"Bearer {tok}"} + while 1: + u=f"https://api.spotify.com/v1/me/tracks?limit=50&offset={r}" + d=requests.get(u,headers=h) + if d.status_code==401:print("nah bro token's dead");break + j=d.json();i=j.get("items",[]) + for it in i:l.append(it["track"]["id"]) + if len(i)<50:break + r+=50;time.sleep(0.5) + return l + +def get_playlist_tracks(tok,pid): # does same thing but like for playlists + l,r=[],0;h={"Authorization":f"Bearer {tok}"} + while 1: + u=f"{SPOTIFY_PLAYLIST_API.format(pid)}?limit=100&offset={r}" + d=requests.get(u,headers=h) + if d.status_code==401:print("nope, bad token");break + if d.status_code==404:print("what playlist");break + j=d.json();i=j.get("items",[]) + for t in i: + tr=t.get("track") + if tr and tr.get("id"):l.append(tr["id"]) + if len(i)<100:break + r+=100;time.sleep(0.5) + return l + +def fetch_metadata(tid,tok): # gets info about a track + return requests.get(SPOTIFY_TRACK_API.format(tid),headers={"Authorization":f"Bearer {tok}"}).json() + +def sanitize_filename(n): # removes stuff that windows would cry about + return"".join(c for c in n if c not in r'\/:*?"<>|').strip() + +def search_invidious(q): # looks up stuff on invidious, yeah + try: + r=requests.get(INVIDIOUS_API.format(query=q),timeout=10);r.raise_for_status() + for v in r.json(): + if v.get("videoId"):return f"https://www.youtube.com/watch?v={v['videoId']}" + except:print("couldn't search :(");return None + +def download_from_youtube(url,f): # yoink an mp3 + os.makedirs(DOWNLOAD_DIR,exist_ok=True) + p=os.path.join(DOWNLOAD_DIR,f) + if os.path.exists(p):print(f"{f}? already here bro");return True + c=[];copt=["--cookies","cookies.txt"] if os.path.exists("cookies.txt") else [] + cmd=["yt-dlp",*copt,"-x","--audio-format","mp3","--no-playlist","--no-live-from-start","-o",p,url] + try: + subprocess.run(cmd,check=True);print(f"yanked {f}");return True + except:print(f"nope, didn't work: {f}");return False + +def embed_album_art(path,img): # slap on some pic + try: + i=requests.get(img).content + a=MP3(path,ID3=ID3) + try:a.add_tags() + except:error() + a.tags.add(APIC(encoding=3,mime='image/jpeg',type=3,desc='Cover',data=i)) + a.save();print(f"art slapped on {path}") + except Exception as e:print(f"eh: {e}") + +def tag_mp3_with_spotify_metadata(p,m): # add title n stuff + try: + a=MP3(p,ID3=EasyID3) + a["title"]=m.get("name","") + a["artist"]=", ".join(z["name"]for z in m.get("artists",[])) + a["album"]=m.get("album",{}).get("name","") + a["date"]=m.get("album",{}).get("release_date","") + a.save();print(f"tagged: {p}") + i=m.get("album",{}).get("images",[]) + if i:embed_album_art(p,i[0]["url"]) + except Exception as e:print(f"tag fail on {p}: {e}") + +def process_track(tid,tok): # the whole shabang + m=fetch_metadata(tid,tok) + if not m:print(f"nah nothing for {tid}");return False + a=m["artists"][0]["name"];t=m["name"] + s=f"{a} - {t} audio";f=sanitize_filename(f"{a} - {t}.mp3") + u=search_invidious(s) + if not u:print(f"nothing on invidious for {s}");return False + if not download_from_youtube(u,f):print(f"skipping {f}");return False + print(f"yoinked {f}");tag_mp3_with_spotify_metadata(os.path.join(DOWNLOAD_DIR,f),m) + return True + +if __name__=="__main__": + p=sys.argv[1]if len(sys.argv)>1 else None + c=load_config();t=ensure_valid_token() + if p: + print(f"playlist: {p}");ids=get_playlist_tracks(t,p) + else: + print("liked songs");ids=get_liked_tracks(t) + print(f"{len(ids)} songs found, whew") + with concurrent.futures.ThreadPoolExecutor(max_workers=8)as x: + fut={x.submit(process_track,i,t):i for i in ids} + for f in concurrent.futures.as_completed(fut): + try:f.result() + except Exception as e:print(f"welp: {e}") diff --git a/spotify_oauth.py b/spotify_oauth.py new file mode 100644 index 0000000..1ad2420 --- /dev/null +++ b/spotify_oauth.py @@ -0,0 +1,93 @@ +import json +import os +import time +import webbrowser +import requests +import subprocess +from urllib.parse import urlencode, urlparse, parse_qs +from http.server import BaseHTTPRequestHandler, HTTPServer + +CONFIG_FILE = "config.json" +TOKEN_URL = "https://accounts.spotify.com/api/token" +AUTH_URL = "https://accounts.spotify.com/authorize" +SCOPE = "user-library-read" +def load_config(): + with open(CONFIG_FILE, "r") as f: + return json.load(f) +def save_config(data): + with open(CONFIG_FILE, "w") as f: + json.dump(data, f, indent=2) +def start_auth_flow(config): + params = { + "client_id": config["client_id"], + "response_type": "code", + "redirect_uri": config["redirect_uri"], + "scope": SCOPE + } + auth_url = f"{AUTH_URL}?{urlencode(params)}" + webbrowser.open(auth_url) + print("Waiting for callback at /callback...") + + class SpotifyAuthHandler(BaseHTTPRequestHandler): + def do_GET(self): + query = urlparse(self.path).query + params = parse_qs(query) + code = params.get("code", [None])[0] + + self.send_response(200) + self.send_header("Content-type", "text/html") + self.end_headers() + self.wfile.write(b"

You can close this window now.

") + + if code: + self.server.auth_code = code + + httpd = HTTPServer(("localhost", 8888), SpotifyAuthHandler) + httpd.handle_request() + return getattr(httpd, "auth_code", None) +def exchange_code_for_token(code, config): + auth = (config["client_id"], config["client_secret"]) + data = { + "grant_type": "authorization_code", + "code": code, + "redirect_uri": config["redirect_uri"] + } + res = requests.post(TOKEN_URL, auth=auth, data=data) + res.raise_for_status() + token_data = res.json() + config["access_token"] = token_data["access_token"] + config["refresh_token"] = token_data["refresh_token"] + config["expires_at"] = int(time.time()) + token_data["expires_in"] + save_config(config) + print("Token saved to config.json") + return token_data["access_token"] +def refresh_token_if_needed(config): + now = int(time.time()) + if "access_token" not in config or now >= config.get("expires_at", 0): + print("Refreshing token...") + auth = (config["client_id"], config["client_secret"]) + data = { + "grant_type": "refresh_token", + "refresh_token": config["refresh_token"] + } + res = requests.post(TOKEN_URL, auth=auth, data=data) + res.raise_for_status() + token_data = res.json() + config["access_token"] = token_data["access_token"] + config["expires_at"] = now + token_data["expires_in"] + save_config(config) + print("Refreshed and saved token.") + return config["access_token"] +if __name__ == "__main__": + config = load_config() + + if "refresh_token" not in config: + code = start_auth_flow(config) + if not code: + print("Authorization failed.") + exit(1) + token = exchange_code_for_token(code, config) + else: + token = refresh_token_if_needed(config) + + print("Spotify access token ready.")