From f273c60f35b8f864d9b0109343060378439e957e Mon Sep 17 00:00:00 2001 From: Yanis Rigaudeau Date: Sun, 1 Mar 2026 14:53:34 +0100 Subject: [PATCH] reply --- master/src/commands/testnats.rs | 33 +++++++++++++++++++++-------- worker/src/main.rs | 37 ++++++++++++++++++++++++--------- 2 files changed, 51 insertions(+), 19 deletions(-) diff --git a/master/src/commands/testnats.rs b/master/src/commands/testnats.rs index 3985c9a..c3b49b1 100644 --- a/master/src/commands/testnats.rs +++ b/master/src/commands/testnats.rs @@ -1,3 +1,5 @@ +use std::str::from_utf8; + use postcard::to_stdvec; use serenity::{ all::{ @@ -6,6 +8,7 @@ use serenity::{ CreateCommandOption, CreateInteractionResponse, CreateInteractionResponseMessage, + EditInteractionResponse, }, builder::CreateCommand, model::application::{CommandOptionType, ResolvedOption, ResolvedValue}, @@ -27,7 +30,17 @@ pub async fn run( .. }) = options.first() { - let job = &Job { + interaction + .create_response( + ctx, + CreateInteractionResponse::Message( + CreateInteractionResponseMessage::new() + .content(format!("Searching: {value}...")), + ), + ) + .await?; + + let job = Job { uuid: new_uuid_v4(), kind: JobKind::Download, inner: InnerStruct { @@ -37,19 +50,21 @@ pub async fn run( println!("job {:?}", job); - if let Err(_why) = nats_client - .publish("corro-dj.download", to_stdvec(job).unwrap().into()) + let response = match nats_client + .request("corro-dj.download", to_stdvec(&job).unwrap().into()) .await { - return Err(serenity::Error::Other("send error")); - } + Ok(resp) => resp, + Err(_why) => return Err(serenity::Error::Other("send error")), + }; + + println!("response: {:?}", from_utf8(&response.payload).unwrap()); interaction - .create_response( + .edit_response( ctx, - CreateInteractionResponse::Message( - CreateInteractionResponseMessage::new().content(format!("Sent: {value}")), - ), + EditInteractionResponse::new() + .content(format!("path: {}", from_utf8(&response.payload).unwrap())), ) .await?; } else { diff --git a/worker/src/main.rs b/worker/src/main.rs index cb0887f..8393ab8 100644 --- a/worker/src/main.rs +++ b/worker/src/main.rs @@ -1,5 +1,3 @@ -use core::ops::Deref; - use futures::StreamExt; use postcard::from_bytes; use types::jobs::Job; @@ -11,12 +9,14 @@ async fn main() { let nats_client = async_nats::connect("nats://localhost:4222") .await .expect("Error creating nats client"); + let mut subscriber = nats_client .queue_subscribe("corro-dj.*", "download".to_string()) .await .unwrap(); let libraries = Libraries::new(which("yt-dlp").unwrap(), which("ffmpeg").unwrap()); + let downloader = Downloader::builder(libraries, "output") .build() .await @@ -25,23 +25,40 @@ async fn main() { // Receive and process messages while let Some(message) = subscriber.next().await { println!("Received message {:?}", message); - let result: Job = from_bytes(message.payload.deref()).unwrap(); + let result: Job = from_bytes(&message.payload).unwrap(); println!("{:?}", result); - let video = downloader - .fetch_video_infos(result.inner.str) - .await - .unwrap(); + let video = match downloader.fetch_video_infos(result.inner.str).await { + Ok(video) => video, + Err(why) => { + println!("{:?}", why); + continue; + } + }; - let audio_path = downloader + let audio_path = match downloader .download_audio_stream_with_quality( &video, format!("{}.opus", video.id), yt_dlp::model::AudioQuality::Best, yt_dlp::model::AudioCodecPreference::Opus, ) - .await; + .await + { + Ok(path) => path, + Err(why) => { + println!("{:?}", why); + continue; + } + }; - println!("{:?}", audio_path); + println!("reply: {:?}", audio_path); + + if let Some(reply) = message.reply { + nats_client + .publish(reply, format!("{:?}", audio_path).into()) + .await + .unwrap(); + } } }