From b63982714bef7a485eceed01c36c7e83921b288c Mon Sep 17 00:00:00 2001 From: Yanis Rigaudeau Date: Sat, 14 Mar 2026 00:07:06 +0100 Subject: [PATCH] cleaner job send function --- Cargo.lock | 27 ++++---- Dockerfile | 2 +- apps/master/Cargo.toml | 1 + apps/master/src/commands/play.rs | 97 ++++++++++++++++++++++------ apps/master/src/commands/testnats.rs | 6 +- apps/master/src/main.rs | 5 -- apps/worker/Cargo.toml | 7 +- apps/worker/src/main.rs | 44 ++++++------- apps/worker/src/workers/download.rs | 7 +- apps/worker/src/workers/play.rs | 4 -- libs/nats/Cargo.toml | 3 + libs/nats/src/functions.rs | 44 ++++++++++++- libs/types/src/jobs.rs | 28 +++----- 13 files changed, 186 insertions(+), 89 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 84bdce7..0f740ad 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2248,6 +2248,7 @@ name = "master" version = "0.1.0" dependencies = [ "async-nats", + "nats", "postcard", "serenity", "tokio", @@ -2267,9 +2268,9 @@ dependencies = [ [[package]] name = "media-seek" -version = "0.3.0" +version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0b04bb6c939d4ce151ab5fe0edf3a1d12e47458839c53f0bcc8e97410e39c50f" +checksum = "bbe0e43b318d814c09360b4c525ef4caa324f079be72936efc73b8cf216ec2e3" dependencies = [ "futures-util", "thiserror 2.0.18", @@ -2359,6 +2360,9 @@ name = "nats" version = "0.1.0" dependencies = [ "async-nats", + "async-trait", + "postcard", + "types", ] [[package]] @@ -2481,9 +2485,9 @@ dependencies = [ [[package]] name = "once_cell" -version = "1.21.3" +version = "1.21.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "42f5e15c9953c5e4ccceeb2e7382a716482c34515315f7b03532b8b4e8393d2d" +checksum = "9f7c3e4beb33f85d45ae3e3a1792185706c8e16d043238c593331cc7cd313b50" [[package]] name = "opaque-debug" @@ -3989,7 +3993,7 @@ dependencies = [ [[package]] name = "songbird" version = "0.5.0" -source = "git+https://github.com/beerpsi-forks/songbird.git?branch=davey#25195877c715e08df25eda668df6b3a6bb3d3cdf" +source = "git+https://github.com/beerpsi-forks/songbird.git?branch=davey#653177e79f2275516d287861a48e5ff65cb7fa32" dependencies = [ "aead", "aes-gcm", @@ -4355,9 +4359,9 @@ dependencies = [ [[package]] name = "tempfile" -version = "3.26.0" +version = "3.27.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "82a72c767771b47409d2345987fda8628641887d5466101319899796367354a0" +checksum = "32497e9a4c7b38532efcdebeef879707aa9f794296a4f0244f6f69e9bc8574bd" dependencies = [ "fastrand", "getrandom 0.4.2", @@ -4748,9 +4752,9 @@ dependencies = [ [[package]] name = "tracing-subscriber" -version = "0.3.22" +version = "0.3.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2f30143827ddab0d256fd843b7a66d164e9f271cfa0dde49142c5ca0ca291f1e" +checksum = "cb7f578e5945fb242538965c2d0b04418d38ec25c79d160cd279bf0731c8d319" dependencies = [ "matchers", "nu-ansi-term", @@ -5711,6 +5715,7 @@ dependencies = [ "async-nats", "futures", "futures-executor", + "nats", "postcard", "rustls 0.23.37", "serenity", @@ -5784,9 +5789,9 @@ dependencies = [ [[package]] name = "yt-dlp" -version = "2.5.0" +version = "2.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "21c453b0ae22daab68b6361dfb7bf37126c4b23dc2f5f6309606e9f6df2b9447" +checksum = "12e0bb5cb558359e6de1c929769b86aae55d1ed1d457ae3c61f9e1bb319fe030" dependencies = [ "async-trait", "cfg-if", diff --git a/Dockerfile b/Dockerfile index 39b5157..c9015ac 100644 --- a/Dockerfile +++ b/Dockerfile @@ -42,7 +42,7 @@ RUN \ apk add --no-cache python3 ffmpeg deno opus RUN \ - wget https://github.com/yt-dlp/yt-dlp/releases/download/2026.02.21/yt-dlp -O /usr/local/bin/yt-dlp && \ + wget https://github.com/yt-dlp/yt-dlp/releases/download/2026.03.13/yt-dlp -O /usr/local/bin/yt-dlp && \ chmod +x /usr/local/bin/yt-dlp WORKDIR /app diff --git a/apps/master/Cargo.toml b/apps/master/Cargo.toml index bc97ec6..64ad384 100644 --- a/apps/master/Cargo.toml +++ b/apps/master/Cargo.toml @@ -5,6 +5,7 @@ edition = "2024" [dependencies] async-nats = { version = "0.46.0" } +nats = { path = "../../libs/nats" } postcard = { version = "1.1.3", features = ["use-std"] } serenity = { version = "0.12.5", default-features = false, features = [ "cache", diff --git a/apps/master/src/commands/play.rs b/apps/master/src/commands/play.rs index fab911a..3d54edd 100644 --- a/apps/master/src/commands/play.rs +++ b/apps/master/src/commands/play.rs @@ -1,16 +1,18 @@ -use std::path::PathBuf; - -use postcard::{from_bytes, to_stdvec}; +use nats::functions::JobClient; use serenity::{ all::{ - CommandInteraction, Context, CreateCommandOption, CreateInteractionResponse, - CreateInteractionResponseMessage, EditInteractionResponse, + CommandInteraction, + Context, + CreateCommandOption, + CreateInteractionResponse, + CreateInteractionResponseMessage, + EditInteractionResponse, }, builder::CreateCommand, model::application::{CommandOptionType, ResolvedOption, ResolvedValue}, }; use types::{ - jobs::{DownloadJob, JobResponse, Jobs, PlayJob, SearchJob}, + jobs::{DownloadJob, JobsMap, JobsResponseMap, PlayJob, SearchJob}, misc::{new_uuid_v4, parse_url_or_default}, }; use url::Url; @@ -27,26 +29,85 @@ pub async fn run( .. }) = options.first() { + interaction + .create_response( + ctx, + CreateInteractionResponse::Message( + CreateInteractionResponseMessage::new() + .content(format!("Searching: {value}...")), + ), + ) + .await?; + let url: Url; let is_url = value.starts_with("https://") || value.starts_with("http://"); if !is_url { - let response = match nats_client - .request( - "corro-dj.search", - to_stdvec(&SearchJob { - uuid: new_uuid_v4(), - query: value.to_string(), - }) - .unwrap() - .into(), - ) + let search_response = match nats_client + .send_job(JobsMap::Search(SearchJob { + uuid: new_uuid_v4(), + query: value.to_string(), + })) .await { - Ok(resp) => resp, + Ok(resp) => match resp { + JobsResponseMap::Search(resp) => resp, + _ => return Err(serenity::Error::Other("Unexpected return type")), + }, Err(_why) => return Err(serenity::Error::Other("send error")), }; + + println!("{:?}", &search_response); + + url = search_response.song.url; + } else { + url = parse_url_or_default(value.to_string()); } + + let download_response = match nats_client + .send_job(JobsMap::Download(DownloadJob { + uuid: new_uuid_v4(), + url, + })) + .await + { + Ok(resp) => match resp { + JobsResponseMap::Download(resp) => resp, + _ => return Err(serenity::Error::Other("Unexpected return type")), + }, + Err(_why) => return Err(serenity::Error::Other("send error")), + }; + + println!("{:?}", &download_response); + + let guild_id = interaction.guild_id.unwrap(); + + let channel_id = guild_id + .get_user_voice_state(&ctx.http, interaction.user.id) + .await + .unwrap() + .channel_id + .unwrap(); + + let _ = match nats_client + .send_job(JobsMap::Play(PlayJob { + uuid: new_uuid_v4(), + path: download_response.path, + channel_id: channel_id.into(), + guild_id: guild_id.into(), + })) + .await + { + Ok(resp) => match resp { + JobsResponseMap::Play(resp) => resp, + _ => return Err(serenity::Error::Other("Unexpected return type")), + }, + Err(_why) => return Err(serenity::Error::Other("send error")), + }; + + interaction + .edit_response(ctx, EditInteractionResponse::new().content("Playing...")) + .await?; } Ok(()) } @@ -60,6 +121,6 @@ pub fn register() -> CreateCommand { "song", "Name or url of the song to play", ) - .required(false), + .required(true), ) } diff --git a/apps/master/src/commands/testnats.rs b/apps/master/src/commands/testnats.rs index be21bd8..d6f0618 100644 --- a/apps/master/src/commands/testnats.rs +++ b/apps/master/src/commands/testnats.rs @@ -14,7 +14,7 @@ use serenity::{ model::application::{CommandOptionType, ResolvedOption, ResolvedValue}, }; use types::{ - jobs::{DownloadJob, JobResponse, Jobs, PlayJob, SearchJob}, + jobs::{DownloadJob, JobResponse, JobsResponseMap, PlayJob, SearchJob}, misc::{new_uuid_v4, parse_url_or_default}, }; use url::Url; @@ -65,7 +65,7 @@ pub async fn run( .edit_response(ctx, EditInteractionResponse::new().content(error)) .await?; return Err(serenity::Error::Other("Search error")); - } else if let Some(Jobs::Search(content)) = search_response.content { + } else if let Some(JobsResponseMap::Search(content)) = search_response.content { url = content.song.url; } else { interaction @@ -102,7 +102,7 @@ pub async fn run( let text_response: String; if let Some(error) = job_response.error { text_response = error; - } else if let Some(Jobs::Download(content)) = job_response.content { + } else if let Some(JobsResponseMap::Download(content)) = job_response.content { text_response = content.path.display().to_string(); } else { text_response = "unkown".to_string(); diff --git a/apps/master/src/main.rs b/apps/master/src/main.rs index 1295e30..8038dd6 100644 --- a/apps/master/src/main.rs +++ b/apps/master/src/main.rs @@ -2,7 +2,6 @@ mod commands; use std::env; -use async_nats::Client; use serenity::{ Client, all::{Context, EventHandler, GatewayIntents}, @@ -69,10 +68,6 @@ impl EventHandler for Handler { } } -impl A for Client { - -} - #[tokio::main] async fn main() { // Configure the client with your Discord bot token in the environment. diff --git a/apps/worker/Cargo.toml b/apps/worker/Cargo.toml index d5f31be..96341dc 100644 --- a/apps/worker/Cargo.toml +++ b/apps/worker/Cargo.toml @@ -7,8 +7,11 @@ edition = "2024" async-nats = { version = "0.46.0" } futures = { version = "0.3.32" } futures-executor = { version = "0.3.32" } +nats = { path = "../../libs/nats" } postcard = { version = "1.1.3", features = ["use-std"] } -rustls = { version = "0.23.37", default-features = false, features = ["aws-lc-rs"] } +rustls = { version = "0.23.37", default-features = false, features = [ + "aws-lc-rs", +] } serenity = { version = "0.12.5", default-features = false, features = [ "cache", "rustls_backend", @@ -19,4 +22,4 @@ symphonia = { version = "0.5.5" } tokio = { version = "1.50.0", features = ["macros", "rt-multi-thread"] } types = { path = "../../libs/types" } which = { version = "8.0.2" } -yt-dlp = { version = "2.5.0" } +yt-dlp = { version = "2.6.0" } diff --git a/apps/worker/src/main.rs b/apps/worker/src/main.rs index f3b6231..c82d973 100644 --- a/apps/worker/src/main.rs +++ b/apps/worker/src/main.rs @@ -10,7 +10,7 @@ use serenity::{ async_trait, }; use songbird::SerenityInit; -use types::jobs::{JobResponse, Jobs}; +use types::jobs::{JobResponse, JobsMap, JobsResponseMap}; use which::which; use yt_dlp::{Downloader, client::Libraries}; @@ -36,36 +36,32 @@ impl EventHandler for Handler { while let Some(message) = subscriber.next().await { println!("Received message {:?}", message); - let subject = message.subject.split(".").collect::>()[1]; + // let subject = message.subject.split(".").collect::>()[1]; + let payload: JobsMap = from_bytes(&message.payload).unwrap(); - let result = match subject { - "download" => workers::download::download( - &self.downloader, - from_bytes(&message.payload).unwrap(), - ) - .await - .map(|res| JobResponse { - content: res.content.map(Jobs::Download), - error: res.error, - }), - "play" => { - workers::play::play(&voice_manager, from_bytes(&message.payload).unwrap()) + let result = + match payload { + JobsMap::Download(payload) => { + workers::download::download(&self.downloader, payload) + .await + .map(|res| JobResponse { + content: res.content.map(JobsResponseMap::Download), + error: res.error, + }) + } + JobsMap::Play(payload) => workers::play::play(&voice_manager, payload) .await .map(|res| JobResponse { - content: res.content.map(Jobs::Play), + content: res.content.map(JobsResponseMap::Play), error: res.error, - }) - } - "search" => { - workers::search::search(&self.downloader, from_bytes(&message.payload).unwrap()) + }), + JobsMap::Search(payload) => workers::search::search(&self.downloader, payload) .await .map(|res| JobResponse { - content: res.content.map(Jobs::Search), + content: res.content.map(JobsResponseMap::Search), error: res.error, - }) - } - _ => Err(format!("subject {subject} does not exists")), - }; + }), + }; let response = match result { Ok(response) => response, diff --git a/apps/worker/src/workers/download.rs b/apps/worker/src/workers/download.rs index 283d859..ff32338 100644 --- a/apps/worker/src/workers/download.rs +++ b/apps/worker/src/workers/download.rs @@ -28,10 +28,13 @@ pub async fn download( Err(err) => return Err(err.to_string()), }; - println!("reply: {:?}", audio_path); + println!("audio_path: {:?}", &audio_path); Ok(JobResponse { - content: Some(DownloadResponse { path: audio_path }), + content: Some(DownloadResponse { + path: audio_path.clone(), + test: audio_path.display().to_string(), + }), error: None, }) } diff --git a/apps/worker/src/workers/play.rs b/apps/worker/src/workers/play.rs index 5940287..e62d230 100644 --- a/apps/worker/src/workers/play.rs +++ b/apps/worker/src/workers/play.rs @@ -37,12 +37,8 @@ pub async fn play( let track_handle = handler.play_input(src.into()); - println!("before info {:?}", track_handle); - println!("{:?}", track_handle.get_info().await); - println!("after info"); - Ok(JobResponse { content: Some(PlayResponse {}), error: None, diff --git a/libs/nats/Cargo.toml b/libs/nats/Cargo.toml index f80fc2a..a0746fa 100644 --- a/libs/nats/Cargo.toml +++ b/libs/nats/Cargo.toml @@ -5,3 +5,6 @@ edition = "2024" [dependencies] async-nats = { version = "0.46.0" } +async-trait = { version = "0.1.89" } +postcard = { version = "1.1.3", features = ["use-std"] } +types = { path = "../../libs/types" } diff --git a/libs/nats/src/functions.rs b/libs/nats/src/functions.rs index d24624b..511cca7 100644 --- a/libs/nats/src/functions.rs +++ b/libs/nats/src/functions.rs @@ -1 +1,43 @@ -pub fn() \ No newline at end of file +use async_nats::Client; +use async_trait::async_trait; +use postcard::{from_bytes, to_stdvec}; +use types::jobs::{JobResponse, JobsMap, JobsResponseMap}; + +#[async_trait] +pub trait JobClient { + async fn send_job(&self, job: JobsMap) -> Result; +} + +#[async_trait] +impl JobClient for Client { + async fn send_job(&self, job: JobsMap) -> Result { + let subject = match &job { + JobsMap::Download(_job) => "download", + JobsMap::Play(_job) => "play", + JobsMap::Search(_job) => "search", + }; + + match self + .request( + format!("corro-dj.{subject}"), + to_stdvec(&job).unwrap().into(), + ) + .await + { + Ok(response) => { + println!("{:?}", &response.payload); + + let parsed: JobResponse = from_bytes(&response.payload).unwrap(); + + if let Some(error) = parsed.error { + Err(error) + } else if let Some(content) = parsed.content { + Ok(content) + } else { + Err("Unknown content".to_string()) + } + } + Err(why) => Err(why.to_string()), + } + } +} diff --git a/libs/types/src/jobs.rs b/libs/types/src/jobs.rs index 3c1bef9..9148526 100644 --- a/libs/types/src/jobs.rs +++ b/libs/types/src/jobs.rs @@ -6,31 +6,22 @@ use uuid::Uuid; use crate::queue::YoutubeSong; - - - -pub enum JobsBody { - -} - -impl Jobs { - fn as_str(&self) -> &'static str { - match self { - Self::Search(self) => "Hello", - Self::Download => "World", - } - } -} - #[derive(Debug, Deserialize, Serialize)] -pub enum Jobs { +pub enum JobsResponseMap { Search(SearchResponse), Download(DownloadResponse), Play(PlayResponse), } +#[derive(Debug, Deserialize, Serialize)] +pub enum JobsMap { + Search(SearchJob), + Download(DownloadJob), + Play(PlayJob), +} + #[derive(Serialize, Deserialize, Debug)] -pub struct JobResponse { +pub struct JobResponse { pub content: Option, pub error: Option, } @@ -44,6 +35,7 @@ pub struct DownloadJob { #[derive(Serialize, Deserialize, Debug)] pub struct DownloadResponse { pub path: PathBuf, + pub test: String, } #[derive(Serialize, Deserialize, Debug)]