dj-embe/usecase/sources.py

199 lines
7.0 KiB
Python

import re
from discord import Interaction
from entity import Album, Artist, Entry, Playlist, Title
from framework import Spotify, Youtube
from service import FileManager
class Sources:
def __init__(
self, fileManager: FileManager, youtube: Youtube, spotify: Spotify
) -> None:
self.fileManager = fileManager
# youtube
self.youtube = youtube
self.youtube_video_regex = re.compile(
r"(https:\/\/)?(www|music)\.youtube\.com\/(watch\?v=|shorts\/)\w*"
)
self.youtube_video_short_regex = re.compile(r"(https:\/\/)?youtu\.be\/\w*")
self.youtube_playlist_regex = re.compile(
r"(https:\/\/)?www\.youtube\.com\/playlist\?list=\w*"
)
# spotify
self.spotify = spotify
self.spotify_track_regex = re.compile(
r"(https:\/\/)?(open.spotify.com\/|spotify:)track(\/|:)\w*"
)
self.spotify_playlist_regex = re.compile(
r"(https:\/\/)?(open.spotify.com\/|spotify:)playlist(\/|:)\w*"
)
self.spotify_album_regex = re.compile(
r"(https:\/\/)?(open.spotify.com\/|spotify:)album(\/|:)\w*"
)
self.spotify_artist_regex = re.compile(
r"(https:\/\/)?(open.spotify.com\/|spotify:)artist(\/|:)\w*"
)
async def processQuery(
self, interaction: Interaction, query: str
) -> list[Entry] | None:
if (
self.youtube_video_regex.match(query) is not None
or self.youtube_video_short_regex.match(query) is not None
):
data = await self.youtube.fetchData(query)
if data is None:
return None
# with open("search_result.json", "w") as file:
# file.write(json.dumps(data))
entry = Entry(
title=Title(name=data["title"], url=data["webpage_url"]),
artist=Artist(name=data["channel"], url=data["channel_url"]),
duration=data["duration"],
thumbnail=data["thumbnail"],
requesterId=interaction.user.id,
source=data["webpage_url"],
)
entry.source = await anext(self.fileManager.download(interaction, [entry]))
return [entry]
if self.youtube_playlist_regex.match(query) is not None:
data = await self.youtube.fetchData(query)
if data is None:
return None
# with open("search_result.json", "w") as file:
# file.write(json.dumps(data))
playlist = Playlist(
name=data["title"],
url=data["webpage_url"],
owner=Artist(name=data["channel"], url=data["channel_url"]),
)
entries = [
Entry(
title=Title(
name=entry_data["title"], url=entry_data["webpage_url"]
),
artist=Artist(
name=entry_data["channel"], url=entry_data["channel_url"]
),
duration=entry_data["duration"],
thumbnail=entry_data["thumbnail"],
requesterId=interaction.user.id,
playlist=playlist,
source=entry_data["webpage_url"],
)
for entry_data in data["entries"]
]
index = 0
async for file in self.fileManager.download(interaction, entries):
entries[index].source = file
index += 1
return entries
if self.spotify_track_regex.match(query) is not None:
data = await self.spotify.getTrack(query)
# with open("spotify_track_result.json", "w") as file:
# file.write(json.dumps(data))
entry = Entry(
title=Title(name=data["name"], url=data["external_urls"]["spotify"]),
artist=Artist(
name=data["artists"][0]["name"],
url=data["artists"][0]["external_urls"]["spotify"],
),
album=Album(
name=data["album"]["name"],
url=data["album"]["external_urls"]["spotify"],
),
duration=0,
thumbnail=data["album"]["images"][0]["url"],
requesterId=interaction.user.id,
)
entry.source = await self.search(f"{entry.title.name} {entry.artist.name}")
entry.source = await anext(self.fileManager.download(interaction, [entry]))
return [entry]
if (
self.spotify_playlist_regex.match(query) is not None
or self.spotify_album_regex.match(query) is not None
or self.spotify_artist_regex.match(query) is not None
):
data = await self.spotify.getPlaylist(query)
# with open("spotify_playlist_result.json", "w") as file:
# file.write(json.dumps(data))
playlist = Playlist(
name=data["name"],
url=data["external_urls"]["spotify"],
owner=Artist(
name=data["owner"]["display_name"],
url=data["owner"]["external_urls"]["spotify"],
),
)
entries = [
Entry(
title=Title(
name=entry_data["track"]["name"],
url=entry_data["track"]["external_urls"]["spotify"],
),
artist=Artist(
name=entry_data["track"]["artists"][0]["name"],
url=entry_data["track"]["artists"][0]["external_urls"][
"spotify"
],
),
album=Album(
name=entry_data["track"]["album"]["name"],
url=entry_data["track"]["album"]["external_urls"]["spotify"],
),
duration=0,
thumbnail=entry_data["track"]["album"]["images"][0]["url"],
requesterId=interaction.user.id,
)
for entry_data in data["tracks"]["items"]
]
for entry in entries:
entry.source = await self.search(
f"{entry.title.name} {entry.artist.name}"
)
print(entry.source)
index = 0
async for file in self.fileManager.download(interaction, entries):
print(entries[index].source)
entries[index].source = file
print(entries[index].source)
index += 1
return entries
searchResult = await self.search(query)
if searchResult is None:
return None
return await self.processQuery(interaction, searchResult)
async def search(self, query) -> str | None:
result = await self.youtube.searchVideo(query)
if len(result) == 0:
return None
return result[0]["link"]