diff --git a/apps/master/src/commands/testnats.rs b/apps/master/src/commands/testnats.rs index c3b49b1..eed606c 100644 --- a/apps/master/src/commands/testnats.rs +++ b/apps/master/src/commands/testnats.rs @@ -1,6 +1,4 @@ -use std::str::from_utf8; - -use postcard::to_stdvec; +use postcard::{from_bytes, to_stdvec}; use serenity::{ all::{ CommandInteraction, @@ -14,7 +12,7 @@ use serenity::{ model::application::{CommandOptionType, ResolvedOption, ResolvedValue}, }; use types::{ - jobs::{InnerStruct, Job, JobKind}, + jobs::{DownloadJob, DownloadResponse, JobResponse}, misc::new_uuid_v4, }; @@ -40,12 +38,9 @@ pub async fn run( ) .await?; - let job = Job { + let job = DownloadJob { uuid: new_uuid_v4(), - kind: JobKind::Download, - inner: InnerStruct { - str: value.to_string(), - }, + url: value.to_string(), }; println!("job {:?}", job); @@ -58,14 +53,22 @@ pub async fn run( Err(_why) => return Err(serenity::Error::Other("send error")), }; - println!("response: {:?}", from_utf8(&response.payload).unwrap()); + let job_response: JobResponse = from_bytes(&response.payload).unwrap(); + + println!("response: {:?}", job_response); + + let text_response: String; + + if let Some(error) = job_response.error { + text_response = error; + } else if let Some(content) = job_response.content { + text_response = content.path.display().to_string(); + } else { + text_response = "unkown".to_string(); + } interaction - .edit_response( - ctx, - EditInteractionResponse::new() - .content(format!("path: {}", from_utf8(&response.payload).unwrap())), - ) + .edit_response(ctx, EditInteractionResponse::new().content(text_response)) .await?; } else { interaction diff --git a/apps/worker/src/main.rs b/apps/worker/src/main.rs index 8393ab8..4378df1 100644 --- a/apps/worker/src/main.rs +++ b/apps/worker/src/main.rs @@ -1,6 +1,8 @@ +mod workers; + use futures::StreamExt; -use postcard::from_bytes; -use types::jobs::Job; +use postcard::{from_bytes, to_stdvec}; +use types::jobs::JobResponse; use which::which; use yt_dlp::{Downloader, client::Libraries}; @@ -11,7 +13,7 @@ async fn main() { .expect("Error creating nats client"); let mut subscriber = nats_client - .queue_subscribe("corro-dj.*", "download".to_string()) + .queue_subscribe("corro-dj.*", "group1".to_string()) .await .unwrap(); @@ -25,38 +27,31 @@ async fn main() { // Receive and process messages while let Some(message) = subscriber.next().await { println!("Received message {:?}", message); - let result: Job = from_bytes(&message.payload).unwrap(); - println!("{:?}", result); - let video = match downloader.fetch_video_infos(result.inner.str).await { - Ok(video) => video, - Err(why) => { - println!("{:?}", why); + let subject = message.subject.split(".").collect::>()[1]; + + let result = match subject { + "download" => { + workers::download::download(&downloader, from_bytes(&message.payload).unwrap()) + .await + } + _ => { + println!("{subject}"); continue; } }; - let audio_path = match downloader - .download_audio_stream_with_quality( - &video, - format!("{}.opus", video.id), - yt_dlp::model::AudioQuality::Best, - yt_dlp::model::AudioCodecPreference::Opus, - ) - .await - { - Ok(path) => path, - Err(why) => { - println!("{:?}", why); - continue; - } + let response = match result { + Ok(response) => response, + Err(err) => JobResponse { + content: None, + error: Some(format!("{err}")), + }, }; - println!("reply: {:?}", audio_path); - if let Some(reply) = message.reply { nats_client - .publish(reply, format!("{:?}", audio_path).into()) + .publish(reply, to_stdvec(&response).unwrap().into()) .await .unwrap(); } diff --git a/apps/worker/src/workers/download.rs b/apps/worker/src/workers/download.rs new file mode 100644 index 0000000..8b0cf21 --- /dev/null +++ b/apps/worker/src/workers/download.rs @@ -0,0 +1,34 @@ +use types::jobs::{DownloadJob, DownloadResponse, JobResponse}; +use yt_dlp::{Downloader, error::Error}; + +pub async fn download( + downloader: &Downloader, + job: DownloadJob, +) -> Result, Error> { + println!("job: {:?}", job); + + let video = match downloader.fetch_video_infos(job.url).await { + Ok(video) => video, + Err(err) => return Err(err), + }; + + let audio_path = match downloader + .download_audio_stream_with_quality( + &video, + format!("{}.opus", video.id), + yt_dlp::model::AudioQuality::Best, + yt_dlp::model::AudioCodecPreference::Opus, + ) + .await + { + Ok(path) => path, + Err(err) => return Err(err), + }; + + println!("reply: {:?}", audio_path); + + Ok(JobResponse { + content: Some(DownloadResponse { path: audio_path }), + error: None, + }) +} diff --git a/apps/worker/src/workers/mod.rs b/apps/worker/src/workers/mod.rs new file mode 100644 index 0000000..674b799 --- /dev/null +++ b/apps/worker/src/workers/mod.rs @@ -0,0 +1 @@ +pub mod download; diff --git a/libs/types/src/jobs.rs b/libs/types/src/jobs.rs index 216787e..ab20c66 100644 --- a/libs/types/src/jobs.rs +++ b/libs/types/src/jobs.rs @@ -1,19 +1,21 @@ +use std::path::PathBuf; + use serde::{Deserialize, Serialize}; use uuid::Uuid; #[derive(Serialize, Deserialize, Debug)] -pub enum JobKind { - Download, +pub struct JobResponse { + pub content: Option, + pub error: Option, } #[derive(Serialize, Deserialize, Debug)] -pub struct InnerStruct { - pub str: String, -} - -#[derive(Serialize, Deserialize, Debug)] -pub struct Job { +pub struct DownloadJob { pub uuid: Uuid, - pub kind: JobKind, - pub inner: InnerStruct, + pub url: String, +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct DownloadResponse { + pub path: PathBuf, }