From a679760b555c8eaf71efc85120a607a58aa0abe5 Mon Sep 17 00:00:00 2001 From: Yanis Rigaudeau Date: Tue, 17 Mar 2026 00:18:09 +0100 Subject: [PATCH] wip --- apps/master/src/commands/play.rs | 15 ++- apps/master/src/common/mod.rs | 6 ++ apps/master/src/main.rs | 103 +++++++++----------- libs/nats-libs/src/{functions.rs => job.rs} | 0 libs/nats-libs/src/lib.rs | 3 +- libs/nats-libs/src/stream.rs | 46 +++++++++ 6 files changed, 112 insertions(+), 61 deletions(-) create mode 100644 apps/master/src/common/mod.rs rename libs/nats-libs/src/{functions.rs => job.rs} (100%) create mode 100644 libs/nats-libs/src/stream.rs diff --git a/apps/master/src/commands/play.rs b/apps/master/src/commands/play.rs index 0326db0..a72252a 100644 --- a/apps/master/src/commands/play.rs +++ b/apps/master/src/commands/play.rs @@ -1,4 +1,4 @@ -use nats_libs::functions::JobClient; +use nats_libs::job::JobClient; use serenity::{ all::{CommandInteraction, Context, CreateCommandOption}, builder::CreateCommand, @@ -12,10 +12,12 @@ use types::{ }; use url::Url; +use crate::common::Services; + pub async fn run( + services: &Services, ctx: &Context, interaction: &CommandInteraction, - nats_client: &async_nats::Client, ) -> Result<(), CorroError> { let options = interaction.data.options(); @@ -36,7 +38,8 @@ pub async fn run( let is_url = value.starts_with("https://") || value.starts_with("http://"); if !is_url { - let search_response = match nats_client + let search_response = match services + .nats_client .send_job(JobsMap::Search(SearchJob { uuid: new_uuid_v4(), query: value.to_string(), @@ -60,7 +63,8 @@ pub async fn run( url = parse_url_or_default(value.to_string()); } - let download_response = match nats_client + let download_response = match services + .nats_client .send_job(JobsMap::Download(DownloadJob { uuid: new_uuid_v4(), url, @@ -88,7 +92,8 @@ pub async fn run( .channel_id .unwrap(); - match nats_client + match services + .nats_client .send_job(JobsMap::Play(PlayJob { uuid: new_uuid_v4(), path: download_response.path, diff --git a/apps/master/src/common/mod.rs b/apps/master/src/common/mod.rs new file mode 100644 index 0000000..499a9be --- /dev/null +++ b/apps/master/src/common/mod.rs @@ -0,0 +1,6 @@ +pub struct Services { + pub nats_client: async_nats::Client, + pub jetstream_client: async_nats::jetstream::Context, + pub jetstream_stream: async_nats::jetstream::stream::Stream, + pub jetstream_kv: async_nats::jetstream::kv::Store, +} diff --git a/apps/master/src/main.rs b/apps/master/src/main.rs index cb707cb..f808618 100644 --- a/apps/master/src/main.rs +++ b/apps/master/src/main.rs @@ -1,13 +1,13 @@ mod commands; +mod common; use std::env; -use async_nats::jetstream::{self, context::traits::Requester}; +use async_nats::jetstream; +use nats_libs::stream::{JetstreamClient, StreamClient}; use serenity::{ - Client, all::{Context, EventHandler, GatewayIntents}, async_trait, - futures::future::ErrInto, model::{ application::{Command, Interaction}, gateway::Ready, @@ -16,8 +16,10 @@ use serenity::{ use serenity_libs::functions::CustomInteraction; use types::error::{CorroError, CorroErrorType}; +use crate::common::Services; + struct Handler { - nats_client: async_nats::Client, + services: Services, } #[async_trait] @@ -27,11 +29,26 @@ impl EventHandler for Handler { println!("Received command interaction: {command:?}"); if let Err(why) = match command.data.name.as_str() { - "play" => commands::play::run(&ctx, &command, &self.nats_client).await, + "play" => { + let guild_id = command.guild_id.unwrap(); + + if let Ok(sequence) = self + .services + .jetstream_client + .lock(guild_id.into()) + .await + { + let result = commands::play::run(&self.services, &ctx, &command).await; + let _ = self.services.jetstream_stream.unlock(sequence).await; + result + } else { + Err(CorroError { error_type: CorroErrorType::NatsError, message: format!("Cannot lock {guild_id}") }) + } + } "ping" => commands::ping::run(&ctx, &command).await, _ => Err(CorroError { error_type: CorroErrorType::CommandError, - message: format!("Command {} not implemented", &command.data.name), + message: format!("Command {} not implemented :(", &command.data.name), }), } { println!("{:?}: {}", why.error_type, why.message); @@ -75,7 +92,15 @@ async fn main() { let jetstream_client = jetstream::new(nats_client.clone()); - let stream = jetstream_client + let jetstream_kv = jetstream_client + .create_key_value(async_nats::jetstream::kv::Config { + bucket: "profiles".to_string(), + ..Default::default() + }) + .await + .expect("Error creating queue kv"); + + let jetstream_stream = jetstream_client .create_stream(jetstream::stream::Config { name: "LOCKS".to_string(), subjects: vec!["locks.*".to_string()], @@ -88,54 +113,22 @@ async fn main() { .await .expect("Error creating locks stream"); - println!("created the stream"); + let handler = Handler { + services: Services { + nats_client, + jetstream_client, + jetstream_stream, + jetstream_kv, + }, + }; - match jetstream_client.publish("locks.a", "test".into()).await.unwrap().await { - Ok(a) => { - println!("publish ok {:?}", a); - } - Err(why) => { - println!("{why:?}"); - } + let mut discord_client = + serenity::Client::builder(discord_token, GatewayIntents::non_privileged()) + .event_handler(handler) + .await + .expect("Error creating discord client"); + + if let Err(why) = discord_client.start().await { + println!("Client start error: {why:?}"); } - - // match jetstream_client.request("locks.a", "test").await { - // Ok(a) => { - // println!("publish ok {a:?}"); - // Ok("a") - // } - // Err(why) => { - // println!("{why:?}"); - // Err("a") - // } - // }; - // match jetstream_client.publish("locks.b", "test".into()).await { - // Ok(a) => { - // println!("publish ok {a:?}"); - // } - // Err(why) => { - // println!("{why:?}"); - // } - // }; - // match jetstream_client.publish("locks.a", "test".into()).await { - // Ok(a) => { - // println!("publish ok {a:?}"); - // } - // Err(why) => { - // println!("{why:?}"); - // } - // }; - - println!("done"); - - // let handler = Handler { nats_client }; - - // let mut discord_client = Client::builder(discord_token, GatewayIntents::non_privileged()) - // .event_handler(handler) - // .await - // .expect("Error creating discord client"); - - // if let Err(why) = discord_client.start().await { - // println!("Client start error: {why:?}"); - // } } diff --git a/libs/nats-libs/src/functions.rs b/libs/nats-libs/src/job.rs similarity index 100% rename from libs/nats-libs/src/functions.rs rename to libs/nats-libs/src/job.rs diff --git a/libs/nats-libs/src/lib.rs b/libs/nats-libs/src/lib.rs index 014fff1..77113d8 100644 --- a/libs/nats-libs/src/lib.rs +++ b/libs/nats-libs/src/lib.rs @@ -1 +1,2 @@ -pub mod functions; +pub mod job; +pub mod stream; diff --git a/libs/nats-libs/src/stream.rs b/libs/nats-libs/src/stream.rs new file mode 100644 index 0000000..ad4cec4 --- /dev/null +++ b/libs/nats-libs/src/stream.rs @@ -0,0 +1,46 @@ +use std::num::NonZeroU64; + +use async_nats::jetstream::{Context, stream::Stream}; +use async_trait::async_trait; +use types::error::{CorroError, CorroErrorType}; + +#[async_trait] +pub trait JetstreamClient { + async fn lock(&self, id: NonZeroU64) -> Result; +} + +#[async_trait] +impl JetstreamClient for Context { + async fn lock(&self, id: NonZeroU64) -> Result { + match self + .publish(format!("locks.{id}"), "".into()) + .await + .unwrap() + .await + { + Ok(ack) => Ok(ack.sequence), + Err(why) => Err(CorroError { + error_type: CorroErrorType::NatsError, + message: why.to_string(), + }), + } + } +} + +#[async_trait] +pub trait StreamClient { + async fn unlock(&self, sequence: u64) -> Result; +} + +#[async_trait] +impl StreamClient for Stream { + async fn unlock(&self, sequence: u64) -> Result { + match self.delete_message(sequence).await { + Ok(result) => Ok(result), + Err(why) => Err(CorroError { + error_type: CorroErrorType::NatsError, + message: why.to_string(), + }), + } + } +}