This commit is contained in:
2026-03-19 01:14:46 +01:00
parent 506d4eb111
commit a51c4258c2
13 changed files with 223 additions and 52 deletions
+1
View File
@@ -1,4 +1,5 @@
target/
output/
.gitignore
compose.dev.yml
Generated
+2
View File
@@ -2247,7 +2247,9 @@ name = "master"
version = "0.1.0"
dependencies = [
"async-nats",
"futures",
"nats-libs",
"postcard",
"serenity",
"serenity-libs",
"tokio",
+2
View File
@@ -5,7 +5,9 @@ edition = "2024"
[dependencies]
async-nats = { version = "0.46.0" }
futures = { version = "0.3.32" }
nats-libs = { path = "../../libs/nats-libs" }
postcard = { version = "1.1.3", features = ["use-std"] }
serenity = { version = "0.12.5", default-features = false, features = [
"cache",
"client",
+68 -7
View File
@@ -1,20 +1,25 @@
use nats_libs::stream::{JetstreamClient, StreamClient};
use serenity::all::{CommandInteraction, Context};
use types::error::CorroError;
use std::num::NonZeroU64;
use crate::common::Services;
use nats_libs::{
kv::KVClient,
stream::{JetstreamClient, StreamClient},
};
use serenity::all::{CommandInteraction, Context};
use types::{error::CorroError, queue::QueueStatus};
use crate::common::{CommandServices, TriggerServices};
pub mod ping;
pub mod play;
pub async fn queue_lock<'a, F, Fut>(
services: &'a Services,
pub async fn queue_lock_command<'a, F, Fut>(
services: &'a CommandServices,
ctx: &'a Context,
interaction: &'a CommandInteraction,
func: F,
) -> Result<(), CorroError>
where
F: Fn(&'a Services, &'a Context, &'a CommandInteraction) -> Fut,
F: Fn(&'a CommandServices, &'a Context, &'a CommandInteraction) -> Fut,
Fut: Future<Output = Result<(), CorroError>> + 'a,
{
let guild_id = interaction.guild_id.unwrap();
@@ -40,3 +45,59 @@ where
Err(why) => Err(why),
}
}
pub async fn queue_lock_trigger<'a, F, Fut>(
services: &'a TriggerServices,
guild_id: NonZeroU64,
func: F,
) -> Result<(), CorroError>
where
F: Fn(&'a TriggerServices, NonZeroU64) -> Fut,
Fut: Future<Output = Result<(), CorroError>> + 'a,
{
match services.jetstream_client.lock(guild_id).await {
Ok(sequence) => {
let result = func(services, guild_id).await;
match services.jetstream_stream.unlock(sequence).await {
Ok(unlocked) => {
if !unlocked {
return Err(CorroError {
error_type: types::error::CorroErrorType::NatsError,
message: "Not unlocked".to_string(),
});
}
}
Err(why) => return Err(why),
}
result
}
Err(why) => Err(why),
}
}
pub async fn trigger_queue(
services: &TriggerServices,
guild_id: NonZeroU64,
) -> Result<(), CorroError> {
let mut queue = match services.jetstream_kv.get_queue(guild_id).await {
Ok(queue) => queue,
Err(why) => {
return Err(why);
}
};
queue.status = QueueStatus::Playing;
match services.jetstream_kv.set_queue(guild_id, &queue).await {
Ok(_sequence) => (),
Err(why) => {
return Err(why);
}
}
println!("queue trigged, should launch play here");
Ok(())
}
+38 -33
View File
@@ -7,16 +7,16 @@ use serenity::{
use serenity_libs::functions::CustomInteraction;
use types::{
error::{CorroError, CorroErrorType},
jobs::{DownloadJob, JobsMap, JobsResponseMap, PlayJob, SearchJob},
jobs::{DownloadJob, JobsMap, JobsResponseMap, SearchJob},
misc::{new_uuid_v4, parse_url_or_default},
queue::{Queue, YoutubeSong},
queue::{Queue, QueueStatus, YoutubeSong},
};
use url::Url;
use crate::common::Services;
use crate::common::CommandServices;
pub async fn run(
services: &Services,
services: &CommandServices,
ctx: &Context,
interaction: &CommandInteraction,
) -> Result<(), CorroError> {
@@ -35,6 +35,7 @@ pub async fn run(
let queue = Queue {
guild_id: guild_id.into(),
uuid: new_uuid_v4(),
status: QueueStatus::Paused,
songs: vec![],
};
match services
@@ -116,44 +117,48 @@ pub async fn run(
artist: download_response.test,
});
match services.jetstream_kv.set_queue(guild_id.into(), &queue).await {
match services
.jetstream_kv
.set_queue(guild_id.into(), &queue)
.await
{
Ok(_sequence) => (),
Err(why) => {
return Err(why);
}
}
let channel_id = guild_id
.get_user_voice_state(&ctx.http, interaction.user.id)
.await
.unwrap()
.channel_id
.unwrap();
// let channel_id = guild_id
// .get_user_voice_state(&ctx.http, interaction.user.id)
// .await
// .unwrap()
// .channel_id
// .unwrap();
match services
.nats_client
.send_job(JobsMap::Play(PlayJob {
uuid: new_uuid_v4(),
path: download_response.path,
channel_id: channel_id.into(),
guild_id: guild_id.into(),
}))
.await
{
Ok(resp) => match resp {
JobsResponseMap::Play(resp) => resp,
_ => {
return Err(CorroError {
error_type: CorroErrorType::JobError,
message: "Unexpected return type".to_string(),
});
}
},
Err(why) => return Err(why),
};
// match services
// .nats_client
// .send_job(JobsMap::Play(PlayJob {
// uuid: new_uuid_v4(),
// path: download_response.path,
// channel_id: channel_id.into(),
// guild_id: guild_id.into(),
// }))
// .await
// {
// Ok(resp) => match resp {
// JobsResponseMap::Play(resp) => resp,
// _ => {
// return Err(CorroError {
// error_type: CorroErrorType::JobError,
// message: "Unexpected return type".to_string(),
// });
// }
// },
// Err(why) => return Err(why),
// };
match interaction
.edit_text_response(ctx, "Playing...".to_string())
.edit_text_response(ctx, "Song added to queue".to_string())
.await
{
Ok(_) => {}
+7 -1
View File
@@ -1,6 +1,12 @@
pub struct Services {
pub struct CommandServices {
pub nats_client: async_nats::Client,
pub jetstream_client: async_nats::jetstream::Context,
pub jetstream_stream: async_nats::jetstream::stream::Stream,
pub jetstream_kv: async_nats::jetstream::kv::Store,
}
pub struct TriggerServices {
pub jetstream_client: async_nats::jetstream::Context,
pub jetstream_stream: async_nats::jetstream::stream::Stream,
pub jetstream_kv: async_nats::jetstream::kv::Store,
}
+58 -5
View File
@@ -4,6 +4,9 @@ mod common;
use std::env;
use async_nats::jetstream;
use futures::StreamExt;
use nats_libs::job::JobClient;
use postcard::from_bytes;
use serenity::{
all::{Context, EventHandler, GatewayIntents},
async_trait,
@@ -13,12 +16,18 @@ use serenity::{
},
};
use serenity_libs::functions::CustomInteraction;
use types::error::{CorroError, CorroErrorType};
use types::{
error::{CorroError, CorroErrorType},
jobs::TriggerMaster,
};
use crate::{commands::queue_lock, common::Services};
use crate::{
commands::{queue_lock_command, queue_lock_trigger, trigger_queue},
common::{CommandServices, TriggerServices},
};
struct Handler {
services: Services,
services: CommandServices,
}
#[async_trait]
@@ -27,8 +36,22 @@ impl EventHandler for Handler {
if let Interaction::Command(command) = interaction {
// println!("Received command interaction: {command:?}");
let guild_id = command.guild_id.unwrap();
if let Err(why) = match command.data.name.as_str() {
"play" => queue_lock(&self.services, &ctx, &command, commands::play::run).await,
"play" => {
let result =
queue_lock_command(&self.services, &ctx, &command, commands::play::run)
.await;
let _ = self
.services
.nats_client
.trigger_master(guild_id.into())
.await;
result
}
"ping" => commands::ping::run(&ctx, &command).await,
_ => Err(CorroError {
error_type: CorroErrorType::CommandError,
@@ -74,6 +97,11 @@ async fn main() {
.await
.expect("Error creating nats client");
let mut subscriber = nats_client
.queue_subscribe("corro-dj.queue.*", "group1".to_string())
.await
.unwrap();
let jetstream_client = jetstream::new(nats_client.clone());
let jetstream_kv = jetstream_client
@@ -97,8 +125,33 @@ async fn main() {
.await
.expect("Error creating locks stream");
tokio::spawn({
let nats_client_clone = nats_client.clone();
let trigger_services = TriggerServices {
jetstream_client: jetstream_client.clone(),
jetstream_stream: jetstream_stream.clone(),
jetstream_kv: jetstream_kv.clone(),
};
async move {
while let Some(message) = subscriber.next().await {
println!("Received message {:?}", message);
let payload: TriggerMaster = from_bytes(&message.payload).unwrap();
let _ =
queue_lock_trigger(&trigger_services, payload.guild_id, trigger_queue).await;
if let Some(reply) = message.reply {
nats_client_clone.publish(reply, "".into()).await.unwrap();
}
}
}
});
let handler = Handler {
services: Services {
services: CommandServices {
nats_client,
jetstream_client,
jetstream_stream,
+1 -2
View File
@@ -30,11 +30,10 @@ impl EventHandler for Handler {
let mut subscriber = self
.nats_client
.queue_subscribe("corro-dj.*", "group1".to_string())
.queue_subscribe("corro-dj.job.*", "group1".to_string())
.await
.unwrap();
// Receive and process messages
while let Some(message) = subscriber.next().await {
println!("Received message {:?}", message);
+21 -2
View File
@@ -1,14 +1,17 @@
use std::num::NonZeroU64;
use async_nats::Client;
use async_trait::async_trait;
use postcard::{from_bytes, to_stdvec};
use types::{
error::{CorroError, CorroErrorType},
jobs::{JobResponse, JobsMap, JobsResponseMap},
jobs::{JobResponse, JobsMap, JobsResponseMap, TriggerMaster},
};
#[async_trait]
pub trait JobClient {
async fn send_job(&self, job: JobsMap) -> Result<JobsResponseMap, CorroError>;
async fn trigger_master(&self, id: NonZeroU64) -> Result<(), CorroError>;
}
#[async_trait]
@@ -22,7 +25,7 @@ impl JobClient for Client {
match self
.request(
format!("corro-dj.{subject}"),
format!("corro-dj.job.{subject}"),
to_stdvec(&job).unwrap().into(),
)
.await
@@ -47,4 +50,20 @@ impl JobClient for Client {
}),
}
}
async fn trigger_master(&self, id: NonZeroU64) -> Result<(), CorroError> {
match self
.request(
format!("corro-dj.queue.{id}"),
to_stdvec(&TriggerMaster { guild_id: id }).unwrap().into(),
)
.await
{
Ok(_) => Ok(()),
Err(why) => Err(CorroError {
error_type: CorroErrorType::NatsError,
message: why.to_string(),
}),
}
}
}
+11
View File
@@ -16,6 +16,7 @@ fn get_key(id: NonZeroU64) -> String {
pub trait KVClient {
async fn get_queue(&self, id: NonZeroU64) -> Result<Queue, CorroError>;
async fn set_queue(&self, id: NonZeroU64, queue: &Queue) -> Result<u64, CorroError>;
async fn delete_queue(&self, id: NonZeroU64) -> Result<(), CorroError>;
}
#[async_trait]
@@ -51,4 +52,14 @@ impl KVClient for Store {
}),
}
}
async fn delete_queue(&self, id: NonZeroU64) -> Result<(), CorroError> {
match self.delete(get_key(id)).await {
Ok(_) => Ok(()),
Err(why) => Err(CorroError {
error_type: CorroErrorType::KVError,
message: why.to_string(),
}),
}
}
}
+2 -2
View File
@@ -19,7 +19,7 @@ impl CustomInteraction for CommandInteraction {
async fn create_text_response(&self, ctx: &Context, content: String) -> Result<(), CorroError> {
match self
.create_response(
&ctx,
&ctx.http,
CreateInteractionResponse::Message(
CreateInteractionResponseMessage::new().content(&content),
),
@@ -36,7 +36,7 @@ impl CustomInteraction for CommandInteraction {
async fn edit_text_response(&self, ctx: &Context, content: String) -> Result<(), CorroError> {
match self
.edit_response(&ctx, EditInteractionResponse::new().content(&content))
.edit_response(&ctx.http, EditInteractionResponse::new().content(&content))
.await
{
Ok(_) => Ok(()),
+5
View File
@@ -6,6 +6,11 @@ use uuid::Uuid;
use crate::{error::CorroError, queue::YoutubeSong};
#[derive(Debug, Deserialize, Serialize)]
pub struct TriggerMaster {
pub guild_id: NonZeroU64,
}
#[derive(Debug, Deserialize, Serialize)]
pub enum JobsResponseMap {
Search(SearchResponse),
+7
View File
@@ -4,6 +4,12 @@ use serde::{Deserialize, Serialize};
use url::Url;
use uuid::Uuid;
#[derive(Serialize, Deserialize, Debug)]
pub enum QueueStatus {
Playing,
Paused,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct YoutubeSong {
pub title: String,
@@ -23,5 +29,6 @@ pub struct YoutubePlaylist {
pub struct Queue {
pub uuid: Uuid,
pub guild_id: NonZeroU64,
pub status: QueueStatus,
pub songs: Vec<YoutubeSong>,
}