import re from asyncio import AbstractEventLoop from os import path, stat from youtubesearchpython.__future__ import VideosSearch from yt_dlp import YoutubeDL from config import YoutubeConfig from entity import File class Youtube: def __init__( self, loop: AbstractEventLoop, config: YoutubeConfig, downloadDirectory: str ) -> None: self.loop = loop self.params = { "format": "bestaudio/best", "outtmpl": path.join(downloadDirectory, "%(id)s"), "restrictfilenames": True, "noplaylist": True, "ignoreerrors": True, "logtostderr": False, "verbose": False, "quiet": True, "no_warnings": True, "noprogress": True, } self.client = YoutubeDL(self.params) self.language = config.language self.region = config.region self.get_id_regex = re.compile( r"https:\/\/www\.youtube\.com\/watch\?v=(?P[\w-]*)" ) async def searchVideo(self, query: str): videosSearch = VideosSearch( query, limit=1, language=self.language, region=self.region ) return await videosSearch.next() async def fetchData(self, url: str): info = await self.loop.run_in_executor( None, lambda: self.client.extract_info(url, download=False) ) return self.client.sanitize_info(info) def getId(self, url: str) -> str | None: match = self.get_id_regex.search(url) if match is None or "id" not in match.groupdict(): return None return match.groupdict()["id"] def downloadProgress(self, progress, status): if status["status"] == "downloading": self.loop.create_task( progress( int(status["downloaded_bytes"]), int( status["total_bytes"] if "total_bytes" in status else status["total_bytes_estimate"] ), ), ) async def download(self, url: str, fileName: str, progress=None) -> None: if progress is not None: progress_hook = lambda status: self.downloadProgress(progress, status) self.client.add_progress_hook(progress_hook) await self.loop.run_in_executor(None, lambda: self.client.download(url)) if progress is not None: self.client._progress_hooks.remove(progress_hook) return File(name=fileName, size=stat(fileName).st_size)