From 506d4eb1110bbd7a747e269a536b01679f5000d7 Mon Sep 17 00:00:00 2001 From: Yanis Rigaudeau Date: Tue, 17 Mar 2026 22:35:13 +0100 Subject: [PATCH] queue wip --- apps/master/src/commands/play.rs | 42 +++++++++++++++++++++++-- apps/master/src/main.rs | 4 +-- libs/nats-libs/src/kv.rs | 54 ++++++++++++++++++++++++++++++++ libs/nats-libs/src/lib.rs | 1 + libs/types/src/error.rs | 1 + 5 files changed, 98 insertions(+), 4 deletions(-) create mode 100644 libs/nats-libs/src/kv.rs diff --git a/apps/master/src/commands/play.rs b/apps/master/src/commands/play.rs index a72252a..20201b2 100644 --- a/apps/master/src/commands/play.rs +++ b/apps/master/src/commands/play.rs @@ -1,4 +1,4 @@ -use nats_libs::job::JobClient; +use nats_libs::{job::JobClient, kv::KVClient}; use serenity::{ all::{CommandInteraction, Context, CreateCommandOption}, builder::CreateCommand, @@ -9,6 +9,7 @@ use types::{ error::{CorroError, CorroErrorType}, jobs::{DownloadJob, JobsMap, JobsResponseMap, PlayJob, SearchJob}, misc::{new_uuid_v4, parse_url_or_default}, + queue::{Queue, YoutubeSong}, }; use url::Url; @@ -26,6 +27,31 @@ pub async fn run( .. }) = options.first() { + let guild_id = interaction.guild_id.unwrap(); + + let mut queue = match services.jetstream_kv.get_queue(guild_id.into()).await { + Ok(queue) => queue, + Err(_why) => { + let queue = Queue { + guild_id: guild_id.into(), + uuid: new_uuid_v4(), + songs: vec![], + }; + match services + .jetstream_kv + .set_queue(guild_id.into(), &queue) + .await + { + Ok(_sequence) => queue, + Err(why) => { + return Err(why); + } + } + } + }; + + println!("{:?}", queue); + match interaction .create_text_response(ctx, format!("Searching: {value}...")) .await @@ -83,7 +109,19 @@ pub async fn run( Err(why) => return Err(why), }; - let guild_id = interaction.guild_id.unwrap(); + queue.songs.push(YoutubeSong { + title: download_response.test.clone(), + thumbnail_url: Url::parse("https://example.com").unwrap(), + url: Url::parse("https://example.com").unwrap(), + artist: download_response.test, + }); + + match services.jetstream_kv.set_queue(guild_id.into(), &queue).await { + Ok(_sequence) => (), + Err(why) => { + return Err(why); + } + } let channel_id = guild_id .get_user_voice_state(&ctx.http, interaction.user.id) diff --git a/apps/master/src/main.rs b/apps/master/src/main.rs index f35cac1..b150935 100644 --- a/apps/master/src/main.rs +++ b/apps/master/src/main.rs @@ -25,7 +25,7 @@ struct Handler { impl EventHandler for Handler { async fn interaction_create(&self, ctx: Context, interaction: Interaction) { if let Interaction::Command(command) = interaction { - println!("Received command interaction: {command:?}"); + // println!("Received command interaction: {command:?}"); if let Err(why) = match command.data.name.as_str() { "play" => queue_lock(&self.services, &ctx, &command, commands::play::run).await, @@ -78,7 +78,7 @@ async fn main() { let jetstream_kv = jetstream_client .create_key_value(async_nats::jetstream::kv::Config { - bucket: "profiles".to_string(), + bucket: "queues".to_string(), ..Default::default() }) .await diff --git a/libs/nats-libs/src/kv.rs b/libs/nats-libs/src/kv.rs new file mode 100644 index 0000000..968d0c2 --- /dev/null +++ b/libs/nats-libs/src/kv.rs @@ -0,0 +1,54 @@ +use std::num::NonZeroU64; + +use async_nats::jetstream::kv::Store; +use async_trait::async_trait; +use postcard::{from_bytes, to_stdvec}; +use types::{ + error::{CorroError, CorroErrorType}, + queue::Queue, +}; + +fn get_key(id: NonZeroU64) -> String { + format!("corro-dj.{id}") +} + +#[async_trait] +pub trait KVClient { + async fn get_queue(&self, id: NonZeroU64) -> Result; + async fn set_queue(&self, id: NonZeroU64, queue: &Queue) -> Result; +} + +#[async_trait] +impl KVClient for Store { + async fn get_queue(&self, id: NonZeroU64) -> Result { + match self.get(get_key(id)).await { + Ok(result) => { + if let Some(result) = result { + Ok(from_bytes(&result).unwrap()) + } else { + Err(CorroError { + error_type: CorroErrorType::KVError, + message: format!("key {} is undefined", get_key(id)), + }) + } + } + Err(why) => Err(CorroError { + error_type: CorroErrorType::KVError, + message: why.to_string(), + }), + } + } + + async fn set_queue(&self, id: NonZeroU64, queue: &Queue) -> Result { + match self + .put(get_key(id), to_stdvec(&queue).unwrap().into()) + .await + { + Ok(sequence) => Ok(sequence), + Err(why) => Err(CorroError { + error_type: CorroErrorType::KVError, + message: why.to_string(), + }), + } + } +} diff --git a/libs/nats-libs/src/lib.rs b/libs/nats-libs/src/lib.rs index 77113d8..f0d62dd 100644 --- a/libs/nats-libs/src/lib.rs +++ b/libs/nats-libs/src/lib.rs @@ -1,2 +1,3 @@ pub mod job; +pub mod kv; pub mod stream; diff --git a/libs/types/src/error.rs b/libs/types/src/error.rs index 182b42f..04de6d8 100644 --- a/libs/types/src/error.rs +++ b/libs/types/src/error.rs @@ -5,6 +5,7 @@ pub enum CorroErrorType { JobError, ParseError, NatsError, + KVError, YtdlpError, CommandError, SerenityError,