sigh
This commit is contained in:
commit
4a42531632
5 changed files with 225 additions and 0 deletions
3
.gitignore
vendored
Normal file
3
.gitignore
vendored
Normal file
|
@ -0,0 +1,3 @@
|
|||
config.json
|
||||
downloads/
|
||||
yt-dlp
|
5
README.md
Normal file
5
README.md
Normal file
|
@ -0,0 +1,5 @@
|
|||
cant be assed to make this nice
|
||||
|
||||
you have to have yt-dlp in your path
|
||||
|
||||
and make a spotify app
|
6
configexample.json
Normal file
6
configexample.json
Normal file
|
@ -0,0 +1,6 @@
|
|||
{
|
||||
"client_id": "",
|
||||
"client_secret": "",
|
||||
"redirect_uri": "http://localhost:8888/callback",
|
||||
"invidious_url": ""
|
||||
}
|
118
main.py
Normal file
118
main.py
Normal file
|
@ -0,0 +1,118 @@
|
|||
import os,json,base64,requests,time,subprocess,sys,concurrent.futures
|
||||
from mutagen.easyid3 import EasyID3;from mutagen.mp3 import MP3
|
||||
from mutagen.id3 import ID3,APIC,error
|
||||
|
||||
CONFIG_FILE="config.json";DOWNLOAD_DIR="downloads"
|
||||
SPOTIFY_TRACK_API="https://api.spotify.com/v1/tracks/{}"
|
||||
SPOTIFY_PLAYLIST_API="https://api.spotify.com/v1/playlists/{}/tracks"
|
||||
INVIDIOUS_API=None
|
||||
|
||||
def load_config(): # opens the config file
|
||||
with open(CONFIG_FILE) as f:return json.load(f)
|
||||
|
||||
def ensure_valid_token():
|
||||
global INVIDIOUS_API
|
||||
c=load_config()
|
||||
INVIDIOUS_API=c.get("invidious_url","https://invidious.snopyta.org/api/v1/search?type=video&q={query}")
|
||||
if "access_token"not in c or int(time.time())>=c.get("expires_at",0):
|
||||
print("no token running oauth")
|
||||
subprocess.run(["python3","spotify_oauth.py"],check=True)
|
||||
c=load_config()
|
||||
return c["access_token"]
|
||||
|
||||
def get_liked_tracks(tok): # gets liked songs i guess
|
||||
l,r=[],0;h={"Authorization":f"Bearer {tok}"}
|
||||
while 1:
|
||||
u=f"https://api.spotify.com/v1/me/tracks?limit=50&offset={r}"
|
||||
d=requests.get(u,headers=h)
|
||||
if d.status_code==401:print("nah bro token's dead");break
|
||||
j=d.json();i=j.get("items",[])
|
||||
for it in i:l.append(it["track"]["id"])
|
||||
if len(i)<50:break
|
||||
r+=50;time.sleep(0.5)
|
||||
return l
|
||||
|
||||
def get_playlist_tracks(tok,pid): # does same thing but like for playlists
|
||||
l,r=[],0;h={"Authorization":f"Bearer {tok}"}
|
||||
while 1:
|
||||
u=f"{SPOTIFY_PLAYLIST_API.format(pid)}?limit=100&offset={r}"
|
||||
d=requests.get(u,headers=h)
|
||||
if d.status_code==401:print("nope, bad token");break
|
||||
if d.status_code==404:print("what playlist");break
|
||||
j=d.json();i=j.get("items",[])
|
||||
for t in i:
|
||||
tr=t.get("track")
|
||||
if tr and tr.get("id"):l.append(tr["id"])
|
||||
if len(i)<100:break
|
||||
r+=100;time.sleep(0.5)
|
||||
return l
|
||||
|
||||
def fetch_metadata(tid,tok): # gets info about a track
|
||||
return requests.get(SPOTIFY_TRACK_API.format(tid),headers={"Authorization":f"Bearer {tok}"}).json()
|
||||
|
||||
def sanitize_filename(n): # removes stuff that windows would cry about
|
||||
return"".join(c for c in n if c not in r'\/:*?"<>|').strip()
|
||||
|
||||
def search_invidious(q): # looks up stuff on invidious, yeah
|
||||
try:
|
||||
r=requests.get(INVIDIOUS_API.format(query=q),timeout=10);r.raise_for_status()
|
||||
for v in r.json():
|
||||
if v.get("videoId"):return f"https://www.youtube.com/watch?v={v['videoId']}"
|
||||
except:print("couldn't search :(");return None
|
||||
|
||||
def download_from_youtube(url,f): # yoink an mp3
|
||||
os.makedirs(DOWNLOAD_DIR,exist_ok=True)
|
||||
p=os.path.join(DOWNLOAD_DIR,f)
|
||||
if os.path.exists(p):print(f"{f}? already here bro");return True
|
||||
c=[];copt=["--cookies","cookies.txt"] if os.path.exists("cookies.txt") else []
|
||||
cmd=["yt-dlp",*copt,"-x","--audio-format","mp3","--no-playlist","--no-live-from-start","-o",p,url]
|
||||
try:
|
||||
subprocess.run(cmd,check=True);print(f"yanked {f}");return True
|
||||
except:print(f"nope, didn't work: {f}");return False
|
||||
|
||||
def embed_album_art(path,img): # slap on some pic
|
||||
try:
|
||||
i=requests.get(img).content
|
||||
a=MP3(path,ID3=ID3)
|
||||
try:a.add_tags()
|
||||
except:error()
|
||||
a.tags.add(APIC(encoding=3,mime='image/jpeg',type=3,desc='Cover',data=i))
|
||||
a.save();print(f"art slapped on {path}")
|
||||
except Exception as e:print(f"eh: {e}")
|
||||
|
||||
def tag_mp3_with_spotify_metadata(p,m): # add title n stuff
|
||||
try:
|
||||
a=MP3(p,ID3=EasyID3)
|
||||
a["title"]=m.get("name","")
|
||||
a["artist"]=", ".join(z["name"]for z in m.get("artists",[]))
|
||||
a["album"]=m.get("album",{}).get("name","")
|
||||
a["date"]=m.get("album",{}).get("release_date","")
|
||||
a.save();print(f"tagged: {p}")
|
||||
i=m.get("album",{}).get("images",[])
|
||||
if i:embed_album_art(p,i[0]["url"])
|
||||
except Exception as e:print(f"tag fail on {p}: {e}")
|
||||
|
||||
def process_track(tid,tok): # the whole shabang
|
||||
m=fetch_metadata(tid,tok)
|
||||
if not m:print(f"nah nothing for {tid}");return False
|
||||
a=m["artists"][0]["name"];t=m["name"]
|
||||
s=f"{a} - {t} audio";f=sanitize_filename(f"{a} - {t}.mp3")
|
||||
u=search_invidious(s)
|
||||
if not u:print(f"nothing on invidious for {s}");return False
|
||||
if not download_from_youtube(u,f):print(f"skipping {f}");return False
|
||||
print(f"yoinked {f}");tag_mp3_with_spotify_metadata(os.path.join(DOWNLOAD_DIR,f),m)
|
||||
return True
|
||||
|
||||
if __name__=="__main__":
|
||||
p=sys.argv[1]if len(sys.argv)>1 else None
|
||||
c=load_config();t=ensure_valid_token()
|
||||
if p:
|
||||
print(f"playlist: {p}");ids=get_playlist_tracks(t,p)
|
||||
else:
|
||||
print("liked songs");ids=get_liked_tracks(t)
|
||||
print(f"{len(ids)} songs found, whew")
|
||||
with concurrent.futures.ThreadPoolExecutor(max_workers=8)as x:
|
||||
fut={x.submit(process_track,i,t):i for i in ids}
|
||||
for f in concurrent.futures.as_completed(fut):
|
||||
try:f.result()
|
||||
except Exception as e:print(f"welp: {e}")
|
93
spotify_oauth.py
Normal file
93
spotify_oauth.py
Normal file
|
@ -0,0 +1,93 @@
|
|||
import json
|
||||
import os
|
||||
import time
|
||||
import webbrowser
|
||||
import requests
|
||||
import subprocess
|
||||
from urllib.parse import urlencode, urlparse, parse_qs
|
||||
from http.server import BaseHTTPRequestHandler, HTTPServer
|
||||
|
||||
CONFIG_FILE = "config.json"
|
||||
TOKEN_URL = "https://accounts.spotify.com/api/token"
|
||||
AUTH_URL = "https://accounts.spotify.com/authorize"
|
||||
SCOPE = "user-library-read"
|
||||
def load_config():
|
||||
with open(CONFIG_FILE, "r") as f:
|
||||
return json.load(f)
|
||||
def save_config(data):
|
||||
with open(CONFIG_FILE, "w") as f:
|
||||
json.dump(data, f, indent=2)
|
||||
def start_auth_flow(config):
|
||||
params = {
|
||||
"client_id": config["client_id"],
|
||||
"response_type": "code",
|
||||
"redirect_uri": config["redirect_uri"],
|
||||
"scope": SCOPE
|
||||
}
|
||||
auth_url = f"{AUTH_URL}?{urlencode(params)}"
|
||||
webbrowser.open(auth_url)
|
||||
print("Waiting for callback at /callback...")
|
||||
|
||||
class SpotifyAuthHandler(BaseHTTPRequestHandler):
|
||||
def do_GET(self):
|
||||
query = urlparse(self.path).query
|
||||
params = parse_qs(query)
|
||||
code = params.get("code", [None])[0]
|
||||
|
||||
self.send_response(200)
|
||||
self.send_header("Content-type", "text/html")
|
||||
self.end_headers()
|
||||
self.wfile.write(b"<h1>You can close this window now.</h1>")
|
||||
|
||||
if code:
|
||||
self.server.auth_code = code
|
||||
|
||||
httpd = HTTPServer(("localhost", 8888), SpotifyAuthHandler)
|
||||
httpd.handle_request()
|
||||
return getattr(httpd, "auth_code", None)
|
||||
def exchange_code_for_token(code, config):
|
||||
auth = (config["client_id"], config["client_secret"])
|
||||
data = {
|
||||
"grant_type": "authorization_code",
|
||||
"code": code,
|
||||
"redirect_uri": config["redirect_uri"]
|
||||
}
|
||||
res = requests.post(TOKEN_URL, auth=auth, data=data)
|
||||
res.raise_for_status()
|
||||
token_data = res.json()
|
||||
config["access_token"] = token_data["access_token"]
|
||||
config["refresh_token"] = token_data["refresh_token"]
|
||||
config["expires_at"] = int(time.time()) + token_data["expires_in"]
|
||||
save_config(config)
|
||||
print("Token saved to config.json")
|
||||
return token_data["access_token"]
|
||||
def refresh_token_if_needed(config):
|
||||
now = int(time.time())
|
||||
if "access_token" not in config or now >= config.get("expires_at", 0):
|
||||
print("Refreshing token...")
|
||||
auth = (config["client_id"], config["client_secret"])
|
||||
data = {
|
||||
"grant_type": "refresh_token",
|
||||
"refresh_token": config["refresh_token"]
|
||||
}
|
||||
res = requests.post(TOKEN_URL, auth=auth, data=data)
|
||||
res.raise_for_status()
|
||||
token_data = res.json()
|
||||
config["access_token"] = token_data["access_token"]
|
||||
config["expires_at"] = now + token_data["expires_in"]
|
||||
save_config(config)
|
||||
print("Refreshed and saved token.")
|
||||
return config["access_token"]
|
||||
if __name__ == "__main__":
|
||||
config = load_config()
|
||||
|
||||
if "refresh_token" not in config:
|
||||
code = start_auth_flow(config)
|
||||
if not code:
|
||||
print("Authorization failed.")
|
||||
exit(1)
|
||||
token = exchange_code_for_token(code, config)
|
||||
else:
|
||||
token = refresh_token_if_needed(config)
|
||||
|
||||
print("Spotify access token ready.")
|
Loading…
Add table
Add a link
Reference in a new issue