2025-08-03 16:22:50 +02:00
|
|
|
import os, json, base64, requests, time, subprocess, sys, concurrent.futures
|
|
|
|
from mutagen.easyid3 import EasyID3
|
|
|
|
from mutagen.mp3 import MP3
|
|
|
|
from mutagen.id3 import ID3, APIC, error
|
2025-08-03 16:17:38 +02:00
|
|
|
|
2025-08-03 16:22:50 +02:00
|
|
|
CONFIG_FILE = "config.json"
|
|
|
|
DOWNLOAD_DIR = "downloads"
|
|
|
|
SPOTIFY_TRACK_API = "https://api.spotify.com/v1/tracks/{}"
|
|
|
|
SPOTIFY_PLAYLIST_API = "https://api.spotify.com/v1/playlists/{}/tracks"
|
|
|
|
INVIDIOUS_API = None
|
2025-08-03 16:17:38 +02:00
|
|
|
|
2025-08-03 16:22:50 +02:00
|
|
|
def load_config(): # opens the config file
|
|
|
|
with open(CONFIG_FILE) as f:
|
|
|
|
return json.load(f)
|
2025-08-03 16:17:38 +02:00
|
|
|
|
|
|
|
def ensure_valid_token():
|
2025-08-03 16:22:50 +02:00
|
|
|
global INVIDIOUS_API
|
|
|
|
c = load_config()
|
|
|
|
INVIDIOUS_API = c.get("invidious_url", "https://invidious.snopyta.org/api/v1/search?type=video&q={query}")
|
|
|
|
if "access_token" not in c or int(time.time()) >= c.get("expires_at", 0):
|
|
|
|
print("no token running oauth")
|
|
|
|
subprocess.run(["python3", "spotify_oauth.py"], check=True)
|
|
|
|
c = load_config()
|
|
|
|
return c["access_token"]
|
2025-08-03 16:17:38 +02:00
|
|
|
|
2025-08-03 16:22:50 +02:00
|
|
|
def get_liked_tracks(tok): # gets liked songs i guess
|
|
|
|
l, r = [], 0
|
|
|
|
h = {"Authorization": f"Bearer {tok}"}
|
|
|
|
while 1:
|
|
|
|
u = f"https://api.spotify.com/v1/me/tracks?limit=50&offset={r}"
|
|
|
|
d = requests.get(u, headers=h)
|
|
|
|
if d.status_code == 401:
|
|
|
|
print("nah bro token's dead")
|
|
|
|
break
|
|
|
|
j = d.json()
|
|
|
|
i = j.get("items", [])
|
|
|
|
for it in i:
|
|
|
|
l.append(it["track"]["id"])
|
|
|
|
if len(i) < 50:
|
|
|
|
break
|
|
|
|
r += 50
|
|
|
|
time.sleep(0.5)
|
|
|
|
return l
|
2025-08-03 16:17:38 +02:00
|
|
|
|
2025-08-03 16:22:50 +02:00
|
|
|
def get_playlist_tracks(tok, pid): # does same thing but like for playlists
|
|
|
|
l, r = [], 0
|
|
|
|
h = {"Authorization": f"Bearer {tok}"}
|
|
|
|
while 1:
|
|
|
|
u = f"{SPOTIFY_PLAYLIST_API.format(pid)}?limit=100&offset={r}"
|
|
|
|
d = requests.get(u, headers=h)
|
|
|
|
if d.status_code == 401:
|
|
|
|
print("nope, bad token")
|
|
|
|
break
|
|
|
|
if d.status_code == 404:
|
|
|
|
print("what playlist")
|
|
|
|
break
|
|
|
|
j = d.json()
|
|
|
|
i = j.get("items", [])
|
|
|
|
for t in i:
|
|
|
|
tr = t.get("track")
|
|
|
|
if tr and tr.get("id"):
|
|
|
|
l.append(tr["id"])
|
|
|
|
if len(i) < 100:
|
|
|
|
break
|
|
|
|
r += 100
|
|
|
|
time.sleep(0.5)
|
|
|
|
return l
|
2025-08-03 16:17:38 +02:00
|
|
|
|
2025-08-03 16:22:50 +02:00
|
|
|
def fetch_metadata(tid, tok): # gets info about a track
|
|
|
|
return requests.get(SPOTIFY_TRACK_API.format(tid), headers={"Authorization": f"Bearer {tok}"}).json()
|
2025-08-03 16:17:38 +02:00
|
|
|
|
2025-08-03 16:22:50 +02:00
|
|
|
def sanitize_filename(n): # removes stuff that windows would cry about
|
|
|
|
return "".join(c for c in n if c not in r'\/:*?"<>|').strip()
|
2025-08-03 16:17:38 +02:00
|
|
|
|
2025-08-03 16:22:50 +02:00
|
|
|
def search_invidious(q): # looks up stuff on invidious
|
|
|
|
try:
|
|
|
|
r = requests.get(INVIDIOUS_API.format(query=q), timeout=10)
|
|
|
|
r.raise_for_status()
|
|
|
|
for v in r.json():
|
|
|
|
if v.get("videoId"):
|
|
|
|
return f"https://www.youtube.com/watch?v={v['videoId']}"
|
|
|
|
except:
|
|
|
|
print("couldn't search :(")
|
|
|
|
return None
|
2025-08-03 16:17:38 +02:00
|
|
|
|
2025-08-03 16:22:50 +02:00
|
|
|
def download_from_youtube(url, f): # yoink an mp3
|
|
|
|
os.makedirs(DOWNLOAD_DIR, exist_ok=True)
|
|
|
|
p = os.path.join(DOWNLOAD_DIR, f)
|
|
|
|
if os.path.exists(p):
|
|
|
|
print(f"{f}? already here bro")
|
|
|
|
return True
|
|
|
|
copt = ["--cookies", "cookies.txt"] if os.path.exists("cookies.txt") else []
|
|
|
|
cmd = ["yt-dlp", *copt, "-x", "--audio-format", "mp3", "--no-playlist", "--no-live-from-start", "-o", p, url]
|
|
|
|
try:
|
|
|
|
subprocess.run(cmd, check=True)
|
|
|
|
print(f"yanked {f}")
|
|
|
|
return True
|
|
|
|
except:
|
|
|
|
print(f"nope, didn't work: {f}")
|
|
|
|
return False
|
2025-08-03 16:17:38 +02:00
|
|
|
|
2025-08-03 16:22:50 +02:00
|
|
|
def embed_album_art(path, img): # slap on some pic
|
|
|
|
try:
|
|
|
|
i = requests.get(img).content
|
|
|
|
a = MP3(path, ID3=ID3)
|
|
|
|
try:
|
|
|
|
a.add_tags()
|
|
|
|
except error:
|
|
|
|
pass
|
|
|
|
a.tags.add(APIC(encoding=3, mime='image/jpeg', type=3, desc='Cover', data=i))
|
|
|
|
a.save()
|
|
|
|
print(f"art slapped on {path}")
|
|
|
|
except Exception as e:
|
|
|
|
print(f"eh: {e}")
|
2025-08-03 16:17:38 +02:00
|
|
|
|
2025-08-03 16:22:50 +02:00
|
|
|
def tag_mp3_with_spotify_metadata(p, m): # add title n stuff
|
|
|
|
try:
|
|
|
|
a = MP3(p, ID3=EasyID3)
|
|
|
|
a["title"] = m.get("name", "")
|
|
|
|
a["artist"] = ", ".join(z["name"] for z in m.get("artists", []))
|
|
|
|
a["album"] = m.get("album", {}).get("name", "")
|
|
|
|
a["date"] = m.get("album", {}).get("release_date", "")
|
|
|
|
a.save()
|
|
|
|
print(f"tagged: {p}")
|
|
|
|
i = m.get("album", {}).get("images", [])
|
|
|
|
if i:
|
|
|
|
embed_album_art(p, i[0]["url"])
|
|
|
|
except Exception as e:
|
|
|
|
print(f"tag fail on {p}: {e}")
|
2025-08-03 16:17:38 +02:00
|
|
|
|
2025-08-03 16:22:50 +02:00
|
|
|
def process_track(tid, tok): # the whole shabang
|
|
|
|
m = fetch_metadata(tid, tok)
|
|
|
|
if not m:
|
|
|
|
print(f"nah nothing for {tid}")
|
|
|
|
return False
|
|
|
|
a = m["artists"][0]["name"]
|
|
|
|
t = m["name"]
|
|
|
|
s = f"{a} - {t} audio"
|
|
|
|
f = sanitize_filename(f"{a} - {t}.mp3")
|
|
|
|
u = search_invidious(s)
|
|
|
|
if not u:
|
|
|
|
print(f"nothing on invidious for {s}")
|
|
|
|
return False
|
|
|
|
if not download_from_youtube(u, f):
|
|
|
|
print(f"skipping {f}")
|
|
|
|
return False
|
|
|
|
print(f"yoinked {f}")
|
|
|
|
tag_mp3_with_spotify_metadata(os.path.join(DOWNLOAD_DIR, f), m)
|
|
|
|
return True
|
2025-08-03 16:17:38 +02:00
|
|
|
|
2025-08-03 16:22:50 +02:00
|
|
|
if __name__ == "__main__":
|
|
|
|
p = sys.argv[1] if len(sys.argv) > 1 else None
|
|
|
|
c = load_config()
|
|
|
|
t = ensure_valid_token()
|
|
|
|
if p:
|
|
|
|
print(f"playlist: {p}")
|
|
|
|
ids = get_playlist_tracks(t, p)
|
|
|
|
else:
|
|
|
|
print("liked songs")
|
|
|
|
ids = get_liked_tracks(t)
|
|
|
|
print(f"{len(ids)} songs found, whew")
|
|
|
|
with concurrent.futures.ThreadPoolExecutor(max_workers=8) as x:
|
|
|
|
fut = {x.submit(process_track, i, t): i for i in ids}
|
|
|
|
for f in concurrent.futures.as_completed(fut):
|
|
|
|
try:
|
|
|
|
f.result()
|
|
|
|
except Exception as e:
|
|
|
|
print(f"welp: {e}")
|