From a51c4258c278ffd8581ed4d08a80f3db131478ae Mon Sep 17 00:00:00 2001 From: Yanis Rigaudeau Date: Thu, 19 Mar 2026 01:14:46 +0100 Subject: [PATCH] wip --- .dockerignore | 1 + Cargo.lock | 2 + apps/master/Cargo.toml | 2 + apps/master/src/commands/mod.rs | 75 ++++++++++++++++++++++++++--- apps/master/src/commands/play.rs | 71 ++++++++++++++------------- apps/master/src/common/mod.rs | 8 ++- apps/master/src/main.rs | 63 ++++++++++++++++++++++-- apps/worker/src/main.rs | 3 +- libs/nats-libs/src/job.rs | 23 ++++++++- libs/nats-libs/src/kv.rs | 11 +++++ libs/serenity-libs/src/functions.rs | 4 +- libs/types/src/jobs.rs | 5 ++ libs/types/src/queue.rs | 7 +++ 13 files changed, 223 insertions(+), 52 deletions(-) diff --git a/.dockerignore b/.dockerignore index 829ea12..1d995da 100644 --- a/.dockerignore +++ b/.dockerignore @@ -1,4 +1,5 @@ target/ +output/ .gitignore compose.dev.yml diff --git a/Cargo.lock b/Cargo.lock index ed16aa2..501884e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2247,7 +2247,9 @@ name = "master" version = "0.1.0" dependencies = [ "async-nats", + "futures", "nats-libs", + "postcard", "serenity", "serenity-libs", "tokio", diff --git a/apps/master/Cargo.toml b/apps/master/Cargo.toml index 73f38ea..324535c 100644 --- a/apps/master/Cargo.toml +++ b/apps/master/Cargo.toml @@ -5,7 +5,9 @@ edition = "2024" [dependencies] async-nats = { version = "0.46.0" } +futures = { version = "0.3.32" } nats-libs = { path = "../../libs/nats-libs" } +postcard = { version = "1.1.3", features = ["use-std"] } serenity = { version = "0.12.5", default-features = false, features = [ "cache", "client", diff --git a/apps/master/src/commands/mod.rs b/apps/master/src/commands/mod.rs index 49fc66c..485d0cd 100644 --- a/apps/master/src/commands/mod.rs +++ b/apps/master/src/commands/mod.rs @@ -1,20 +1,25 @@ -use nats_libs::stream::{JetstreamClient, StreamClient}; -use serenity::all::{CommandInteraction, Context}; -use types::error::CorroError; +use std::num::NonZeroU64; -use crate::common::Services; +use nats_libs::{ + kv::KVClient, + stream::{JetstreamClient, StreamClient}, +}; +use serenity::all::{CommandInteraction, Context}; +use types::{error::CorroError, queue::QueueStatus}; + +use crate::common::{CommandServices, TriggerServices}; pub mod ping; pub mod play; -pub async fn queue_lock<'a, F, Fut>( - services: &'a Services, +pub async fn queue_lock_command<'a, F, Fut>( + services: &'a CommandServices, ctx: &'a Context, interaction: &'a CommandInteraction, func: F, ) -> Result<(), CorroError> where - F: Fn(&'a Services, &'a Context, &'a CommandInteraction) -> Fut, + F: Fn(&'a CommandServices, &'a Context, &'a CommandInteraction) -> Fut, Fut: Future> + 'a, { let guild_id = interaction.guild_id.unwrap(); @@ -40,3 +45,59 @@ where Err(why) => Err(why), } } + +pub async fn queue_lock_trigger<'a, F, Fut>( + services: &'a TriggerServices, + guild_id: NonZeroU64, + func: F, +) -> Result<(), CorroError> +where + F: Fn(&'a TriggerServices, NonZeroU64) -> Fut, + Fut: Future> + 'a, +{ + match services.jetstream_client.lock(guild_id).await { + Ok(sequence) => { + let result = func(services, guild_id).await; + + match services.jetstream_stream.unlock(sequence).await { + Ok(unlocked) => { + if !unlocked { + return Err(CorroError { + error_type: types::error::CorroErrorType::NatsError, + message: "Not unlocked".to_string(), + }); + } + } + Err(why) => return Err(why), + } + + result + } + Err(why) => Err(why), + } +} + +pub async fn trigger_queue( + services: &TriggerServices, + guild_id: NonZeroU64, +) -> Result<(), CorroError> { + let mut queue = match services.jetstream_kv.get_queue(guild_id).await { + Ok(queue) => queue, + Err(why) => { + return Err(why); + } + }; + + queue.status = QueueStatus::Playing; + + match services.jetstream_kv.set_queue(guild_id, &queue).await { + Ok(_sequence) => (), + Err(why) => { + return Err(why); + } + } + + println!("queue trigged, should launch play here"); + + Ok(()) +} diff --git a/apps/master/src/commands/play.rs b/apps/master/src/commands/play.rs index 20201b2..1fb7146 100644 --- a/apps/master/src/commands/play.rs +++ b/apps/master/src/commands/play.rs @@ -7,16 +7,16 @@ use serenity::{ use serenity_libs::functions::CustomInteraction; use types::{ error::{CorroError, CorroErrorType}, - jobs::{DownloadJob, JobsMap, JobsResponseMap, PlayJob, SearchJob}, + jobs::{DownloadJob, JobsMap, JobsResponseMap, SearchJob}, misc::{new_uuid_v4, parse_url_or_default}, - queue::{Queue, YoutubeSong}, + queue::{Queue, QueueStatus, YoutubeSong}, }; use url::Url; -use crate::common::Services; +use crate::common::CommandServices; pub async fn run( - services: &Services, + services: &CommandServices, ctx: &Context, interaction: &CommandInteraction, ) -> Result<(), CorroError> { @@ -35,6 +35,7 @@ pub async fn run( let queue = Queue { guild_id: guild_id.into(), uuid: new_uuid_v4(), + status: QueueStatus::Paused, songs: vec![], }; match services @@ -116,44 +117,48 @@ pub async fn run( artist: download_response.test, }); - match services.jetstream_kv.set_queue(guild_id.into(), &queue).await { + match services + .jetstream_kv + .set_queue(guild_id.into(), &queue) + .await + { Ok(_sequence) => (), Err(why) => { return Err(why); } } - let channel_id = guild_id - .get_user_voice_state(&ctx.http, interaction.user.id) - .await - .unwrap() - .channel_id - .unwrap(); + // let channel_id = guild_id + // .get_user_voice_state(&ctx.http, interaction.user.id) + // .await + // .unwrap() + // .channel_id + // .unwrap(); - match services - .nats_client - .send_job(JobsMap::Play(PlayJob { - uuid: new_uuid_v4(), - path: download_response.path, - channel_id: channel_id.into(), - guild_id: guild_id.into(), - })) - .await - { - Ok(resp) => match resp { - JobsResponseMap::Play(resp) => resp, - _ => { - return Err(CorroError { - error_type: CorroErrorType::JobError, - message: "Unexpected return type".to_string(), - }); - } - }, - Err(why) => return Err(why), - }; + // match services + // .nats_client + // .send_job(JobsMap::Play(PlayJob { + // uuid: new_uuid_v4(), + // path: download_response.path, + // channel_id: channel_id.into(), + // guild_id: guild_id.into(), + // })) + // .await + // { + // Ok(resp) => match resp { + // JobsResponseMap::Play(resp) => resp, + // _ => { + // return Err(CorroError { + // error_type: CorroErrorType::JobError, + // message: "Unexpected return type".to_string(), + // }); + // } + // }, + // Err(why) => return Err(why), + // }; match interaction - .edit_text_response(ctx, "Playing...".to_string()) + .edit_text_response(ctx, "Song added to queue".to_string()) .await { Ok(_) => {} diff --git a/apps/master/src/common/mod.rs b/apps/master/src/common/mod.rs index 499a9be..cb98ef6 100644 --- a/apps/master/src/common/mod.rs +++ b/apps/master/src/common/mod.rs @@ -1,6 +1,12 @@ -pub struct Services { +pub struct CommandServices { pub nats_client: async_nats::Client, pub jetstream_client: async_nats::jetstream::Context, pub jetstream_stream: async_nats::jetstream::stream::Stream, pub jetstream_kv: async_nats::jetstream::kv::Store, } + +pub struct TriggerServices { + pub jetstream_client: async_nats::jetstream::Context, + pub jetstream_stream: async_nats::jetstream::stream::Stream, + pub jetstream_kv: async_nats::jetstream::kv::Store, +} diff --git a/apps/master/src/main.rs b/apps/master/src/main.rs index b150935..ba4faa9 100644 --- a/apps/master/src/main.rs +++ b/apps/master/src/main.rs @@ -4,6 +4,9 @@ mod common; use std::env; use async_nats::jetstream; +use futures::StreamExt; +use nats_libs::job::JobClient; +use postcard::from_bytes; use serenity::{ all::{Context, EventHandler, GatewayIntents}, async_trait, @@ -13,12 +16,18 @@ use serenity::{ }, }; use serenity_libs::functions::CustomInteraction; -use types::error::{CorroError, CorroErrorType}; +use types::{ + error::{CorroError, CorroErrorType}, + jobs::TriggerMaster, +}; -use crate::{commands::queue_lock, common::Services}; +use crate::{ + commands::{queue_lock_command, queue_lock_trigger, trigger_queue}, + common::{CommandServices, TriggerServices}, +}; struct Handler { - services: Services, + services: CommandServices, } #[async_trait] @@ -27,8 +36,22 @@ impl EventHandler for Handler { if let Interaction::Command(command) = interaction { // println!("Received command interaction: {command:?}"); + let guild_id = command.guild_id.unwrap(); + if let Err(why) = match command.data.name.as_str() { - "play" => queue_lock(&self.services, &ctx, &command, commands::play::run).await, + "play" => { + let result = + queue_lock_command(&self.services, &ctx, &command, commands::play::run) + .await; + + let _ = self + .services + .nats_client + .trigger_master(guild_id.into()) + .await; + + result + } "ping" => commands::ping::run(&ctx, &command).await, _ => Err(CorroError { error_type: CorroErrorType::CommandError, @@ -74,6 +97,11 @@ async fn main() { .await .expect("Error creating nats client"); + let mut subscriber = nats_client + .queue_subscribe("corro-dj.queue.*", "group1".to_string()) + .await + .unwrap(); + let jetstream_client = jetstream::new(nats_client.clone()); let jetstream_kv = jetstream_client @@ -97,8 +125,33 @@ async fn main() { .await .expect("Error creating locks stream"); + tokio::spawn({ + let nats_client_clone = nats_client.clone(); + + let trigger_services = TriggerServices { + jetstream_client: jetstream_client.clone(), + jetstream_stream: jetstream_stream.clone(), + jetstream_kv: jetstream_kv.clone(), + }; + + async move { + while let Some(message) = subscriber.next().await { + println!("Received message {:?}", message); + + let payload: TriggerMaster = from_bytes(&message.payload).unwrap(); + + let _ = + queue_lock_trigger(&trigger_services, payload.guild_id, trigger_queue).await; + + if let Some(reply) = message.reply { + nats_client_clone.publish(reply, "".into()).await.unwrap(); + } + } + } + }); + let handler = Handler { - services: Services { + services: CommandServices { nats_client, jetstream_client, jetstream_stream, diff --git a/apps/worker/src/main.rs b/apps/worker/src/main.rs index 4dafee3..ff39479 100644 --- a/apps/worker/src/main.rs +++ b/apps/worker/src/main.rs @@ -30,11 +30,10 @@ impl EventHandler for Handler { let mut subscriber = self .nats_client - .queue_subscribe("corro-dj.*", "group1".to_string()) + .queue_subscribe("corro-dj.job.*", "group1".to_string()) .await .unwrap(); - // Receive and process messages while let Some(message) = subscriber.next().await { println!("Received message {:?}", message); diff --git a/libs/nats-libs/src/job.rs b/libs/nats-libs/src/job.rs index 3680583..f73aa42 100644 --- a/libs/nats-libs/src/job.rs +++ b/libs/nats-libs/src/job.rs @@ -1,14 +1,17 @@ +use std::num::NonZeroU64; + use async_nats::Client; use async_trait::async_trait; use postcard::{from_bytes, to_stdvec}; use types::{ error::{CorroError, CorroErrorType}, - jobs::{JobResponse, JobsMap, JobsResponseMap}, + jobs::{JobResponse, JobsMap, JobsResponseMap, TriggerMaster}, }; #[async_trait] pub trait JobClient { async fn send_job(&self, job: JobsMap) -> Result; + async fn trigger_master(&self, id: NonZeroU64) -> Result<(), CorroError>; } #[async_trait] @@ -22,7 +25,7 @@ impl JobClient for Client { match self .request( - format!("corro-dj.{subject}"), + format!("corro-dj.job.{subject}"), to_stdvec(&job).unwrap().into(), ) .await @@ -47,4 +50,20 @@ impl JobClient for Client { }), } } + + async fn trigger_master(&self, id: NonZeroU64) -> Result<(), CorroError> { + match self + .request( + format!("corro-dj.queue.{id}"), + to_stdvec(&TriggerMaster { guild_id: id }).unwrap().into(), + ) + .await + { + Ok(_) => Ok(()), + Err(why) => Err(CorroError { + error_type: CorroErrorType::NatsError, + message: why.to_string(), + }), + } + } } diff --git a/libs/nats-libs/src/kv.rs b/libs/nats-libs/src/kv.rs index 968d0c2..b862a55 100644 --- a/libs/nats-libs/src/kv.rs +++ b/libs/nats-libs/src/kv.rs @@ -16,6 +16,7 @@ fn get_key(id: NonZeroU64) -> String { pub trait KVClient { async fn get_queue(&self, id: NonZeroU64) -> Result; async fn set_queue(&self, id: NonZeroU64, queue: &Queue) -> Result; + async fn delete_queue(&self, id: NonZeroU64) -> Result<(), CorroError>; } #[async_trait] @@ -51,4 +52,14 @@ impl KVClient for Store { }), } } + + async fn delete_queue(&self, id: NonZeroU64) -> Result<(), CorroError> { + match self.delete(get_key(id)).await { + Ok(_) => Ok(()), + Err(why) => Err(CorroError { + error_type: CorroErrorType::KVError, + message: why.to_string(), + }), + } + } } diff --git a/libs/serenity-libs/src/functions.rs b/libs/serenity-libs/src/functions.rs index 5daabc7..bdd84e2 100644 --- a/libs/serenity-libs/src/functions.rs +++ b/libs/serenity-libs/src/functions.rs @@ -19,7 +19,7 @@ impl CustomInteraction for CommandInteraction { async fn create_text_response(&self, ctx: &Context, content: String) -> Result<(), CorroError> { match self .create_response( - &ctx, + &ctx.http, CreateInteractionResponse::Message( CreateInteractionResponseMessage::new().content(&content), ), @@ -36,7 +36,7 @@ impl CustomInteraction for CommandInteraction { async fn edit_text_response(&self, ctx: &Context, content: String) -> Result<(), CorroError> { match self - .edit_response(&ctx, EditInteractionResponse::new().content(&content)) + .edit_response(&ctx.http, EditInteractionResponse::new().content(&content)) .await { Ok(_) => Ok(()), diff --git a/libs/types/src/jobs.rs b/libs/types/src/jobs.rs index ff1757f..7284c1d 100644 --- a/libs/types/src/jobs.rs +++ b/libs/types/src/jobs.rs @@ -6,6 +6,11 @@ use uuid::Uuid; use crate::{error::CorroError, queue::YoutubeSong}; +#[derive(Debug, Deserialize, Serialize)] +pub struct TriggerMaster { + pub guild_id: NonZeroU64, +} + #[derive(Debug, Deserialize, Serialize)] pub enum JobsResponseMap { Search(SearchResponse), diff --git a/libs/types/src/queue.rs b/libs/types/src/queue.rs index 79d6b8f..265be37 100644 --- a/libs/types/src/queue.rs +++ b/libs/types/src/queue.rs @@ -4,6 +4,12 @@ use serde::{Deserialize, Serialize}; use url::Url; use uuid::Uuid; +#[derive(Serialize, Deserialize, Debug)] +pub enum QueueStatus { + Playing, + Paused, +} + #[derive(Serialize, Deserialize, Debug)] pub struct YoutubeSong { pub title: String, @@ -23,5 +29,6 @@ pub struct YoutubePlaylist { pub struct Queue { pub uuid: Uuid, pub guild_id: NonZeroU64, + pub status: QueueStatus, pub songs: Vec, }