use futures::StreamExt; use postcard::from_bytes; use types::jobs::Job; use which::which; use yt_dlp::{Downloader, client::Libraries}; #[tokio::main] async fn main() { let nats_client = async_nats::connect("nats://localhost:4222") .await .expect("Error creating nats client"); let mut subscriber = nats_client .queue_subscribe("corro-dj.*", "download".to_string()) .await .unwrap(); let libraries = Libraries::new(which("yt-dlp").unwrap(), which("ffmpeg").unwrap()); let downloader = Downloader::builder(libraries, "output") .build() .await .unwrap(); // Receive and process messages while let Some(message) = subscriber.next().await { println!("Received message {:?}", message); let result: Job = from_bytes(&message.payload).unwrap(); println!("{:?}", result); let video = match downloader.fetch_video_infos(result.inner.str).await { Ok(video) => video, Err(why) => { println!("{:?}", why); continue; } }; let audio_path = match downloader .download_audio_stream_with_quality( &video, format!("{}.opus", video.id), yt_dlp::model::AudioQuality::Best, yt_dlp::model::AudioCodecPreference::Opus, ) .await { Ok(path) => path, Err(why) => { println!("{:?}", why); continue; } }; println!("reply: {:?}", audio_path); if let Some(reply) = message.reply { nats_client .publish(reply, format!("{:?}", audio_path).into()) .await .unwrap(); } } }