lot of refactor

This commit is contained in:
2026-03-14 19:21:56 +01:00
parent b63982714b
commit 8d4a002b73
22 changed files with 256 additions and 313 deletions
+2 -3
View File
@@ -5,8 +5,7 @@ edition = "2024"
[dependencies]
async-nats = { version = "0.46.0" }
nats = { path = "../../libs/nats" }
postcard = { version = "1.1.3", features = ["use-std"] }
nats-libs = { path = "../../libs/nats-libs" }
serenity = { version = "0.12.5", default-features = false, features = [
"cache",
"client",
@@ -14,7 +13,7 @@ serenity = { version = "0.12.5", default-features = false, features = [
"gateway",
"rustls_backend",
] }
serenity-libs = { path = "../../libs/serenity-libs" }
tokio = { version = "1.50.0", features = ["macros", "rt-multi-thread"] }
types = { path = "../../libs/types" }
url = { version = "2.5.8", features = ["serde"] }
uuid = { version = "1.22.0" }
-1
View File
@@ -1,3 +1,2 @@
pub mod ping;
pub mod play;
pub mod testnats;
+11 -3
View File
@@ -1,7 +1,15 @@
use serenity::{builder::CreateCommand, model::application::ResolvedOption};
use serenity::{
all::{CommandInteraction, Context},
builder::CreateCommand,
};
use serenity_libs::functions::CustomInteraction;
use types::error::CorroError;
pub fn run(_options: &[ResolvedOption]) -> String {
"Pong!".to_string()
pub async fn run(ctx: &Context, interaction: &CommandInteraction) -> Result<(), CorroError> {
let _ = interaction
.create_text_response(ctx, "Pong!".to_string())
.await;
Ok(())
}
pub fn register() -> CreateCommand {
+42 -33
View File
@@ -1,17 +1,12 @@
use nats::functions::JobClient;
use nats_libs::functions::JobClient;
use serenity::{
all::{
CommandInteraction,
Context,
CreateCommandOption,
CreateInteractionResponse,
CreateInteractionResponseMessage,
EditInteractionResponse,
},
all::{CommandInteraction, Context, CreateCommandOption},
builder::CreateCommand,
model::application::{CommandOptionType, ResolvedOption, ResolvedValue},
};
use serenity_libs::functions::CustomInteraction;
use types::{
error::{CorroError, CorroErrorType},
jobs::{DownloadJob, JobsMap, JobsResponseMap, PlayJob, SearchJob},
misc::{new_uuid_v4, parse_url_or_default},
};
@@ -21,7 +16,7 @@ pub async fn run(
ctx: &Context,
interaction: &CommandInteraction,
nats_client: &async_nats::Client,
) -> Result<(), serenity::Error> {
) -> Result<(), CorroError> {
let options = interaction.data.options();
if let Some(ResolvedOption {
@@ -29,15 +24,13 @@ pub async fn run(
..
}) = options.first()
{
interaction
.create_response(
ctx,
CreateInteractionResponse::Message(
CreateInteractionResponseMessage::new()
.content(format!("Searching: {value}...")),
),
)
.await?;
match interaction
.create_text_response(ctx, format!("Searching: {value}..."))
.await
{
Ok(_) => {}
Err(why) => return Err(why),
};
let url: Url;
let is_url = value.starts_with("https://") || value.starts_with("http://");
@@ -52,13 +45,16 @@ pub async fn run(
{
Ok(resp) => match resp {
JobsResponseMap::Search(resp) => resp,
_ => return Err(serenity::Error::Other("Unexpected return type")),
_ => {
return Err(CorroError {
error_type: CorroErrorType::JobError,
message: "Unexpected return type".to_string(),
});
}
},
Err(_why) => return Err(serenity::Error::Other("send error")),
Err(why) => return Err(why),
};
println!("{:?}", &search_response);
url = search_response.song.url;
} else {
url = parse_url_or_default(value.to_string());
@@ -73,13 +69,16 @@ pub async fn run(
{
Ok(resp) => match resp {
JobsResponseMap::Download(resp) => resp,
_ => return Err(serenity::Error::Other("Unexpected return type")),
_ => {
return Err(CorroError {
error_type: CorroErrorType::JobError,
message: "Unexpected return type".to_string(),
});
}
},
Err(_why) => return Err(serenity::Error::Other("send error")),
Err(why) => return Err(why),
};
println!("{:?}", &download_response);
let guild_id = interaction.guild_id.unwrap();
let channel_id = guild_id
@@ -89,7 +88,7 @@ pub async fn run(
.channel_id
.unwrap();
let _ = match nats_client
match nats_client
.send_job(JobsMap::Play(PlayJob {
uuid: new_uuid_v4(),
path: download_response.path,
@@ -100,15 +99,25 @@ pub async fn run(
{
Ok(resp) => match resp {
JobsResponseMap::Play(resp) => resp,
_ => return Err(serenity::Error::Other("Unexpected return type")),
_ => {
return Err(CorroError {
error_type: CorroErrorType::JobError,
message: "Unexpected return type".to_string(),
});
}
},
Err(_why) => return Err(serenity::Error::Other("send error")),
Err(why) => return Err(why),
};
interaction
.edit_response(ctx, EditInteractionResponse::new().content("Playing..."))
.await?;
match interaction
.edit_text_response(ctx, "Playing...".to_string())
.await
{
Ok(_) => {}
Err(why) => return Err(why),
};
}
Ok(())
}
-169
View File
@@ -1,169 +0,0 @@
use std::path::PathBuf;
use postcard::{from_bytes, to_stdvec};
use serenity::{
all::{
CommandInteraction,
Context,
CreateCommandOption,
CreateInteractionResponse,
CreateInteractionResponseMessage,
EditInteractionResponse,
},
builder::CreateCommand,
model::application::{CommandOptionType, ResolvedOption, ResolvedValue},
};
use types::{
jobs::{DownloadJob, JobResponse, JobsResponseMap, PlayJob, SearchJob},
misc::{new_uuid_v4, parse_url_or_default},
};
use url::Url;
pub async fn run(
ctx: &Context,
interaction: &CommandInteraction,
nats_client: &async_nats::Client,
) -> Result<(), serenity::Error> {
let options = interaction.data.options();
if let Some(ResolvedOption {
value: ResolvedValue::String(value),
..
}) = options.first()
{
interaction
.create_response(
ctx,
CreateInteractionResponse::Message(
CreateInteractionResponseMessage::new()
.content(format!("Searching: {value}...")),
),
)
.await?;
let url: Url;
let is_url = value.starts_with("https://");
if !is_url {
let search_job = SearchJob {
uuid: new_uuid_v4(),
query: value.to_string(),
};
let response = match nats_client
.request("corro-dj.search", to_stdvec(&search_job).unwrap().into())
.await
{
Ok(resp) => resp,
Err(_why) => return Err(serenity::Error::Other("send error")),
};
let search_response: JobResponse = from_bytes(&response.payload).unwrap();
if let Some(error) = search_response.error {
interaction
.edit_response(ctx, EditInteractionResponse::new().content(error))
.await?;
return Err(serenity::Error::Other("Search error"));
} else if let Some(JobsResponseMap::Search(content)) = search_response.content {
url = content.song.url;
} else {
interaction
.edit_response(ctx, EditInteractionResponse::new().content("unknown error"))
.await?;
return Err(serenity::Error::Other("unknown error"));
}
} else {
url = parse_url_or_default(value.to_string());
}
let download_job = DownloadJob {
uuid: new_uuid_v4(),
url,
};
println!("job {:?}", download_job);
let response = match nats_client
.request(
"corro-dj.download",
to_stdvec(&download_job).unwrap().into(),
)
.await
{
Ok(resp) => resp,
Err(_why) => return Err(serenity::Error::Other("send error")),
};
let job_response: JobResponse = from_bytes(&response.payload).unwrap();
println!("response: {:?}", job_response);
let text_response: String;
if let Some(error) = job_response.error {
text_response = error;
} else if let Some(JobsResponseMap::Download(content)) = job_response.content {
text_response = content.path.display().to_string();
} else {
text_response = "unkown".to_string();
}
interaction
.edit_response(ctx, EditInteractionResponse::new().content(&text_response))
.await?;
let guild_id = interaction.guild_id.unwrap();
let channel_id = guild_id
.get_user_voice_state(&ctx.http, interaction.user.id)
.await
.unwrap()
.channel_id
.unwrap();
let play_job = PlayJob {
uuid: new_uuid_v4(),
path: PathBuf::from(&text_response),
guild_id: guild_id.into(),
channel_id: channel_id.into(),
};
println!("job {:?}", play_job);
let _ = match nats_client
.request("corro-dj.play", to_stdvec(&play_job).unwrap().into())
.await
{
Ok(resp) => resp,
Err(_why) => return Err(serenity::Error::Other("send error")),
};
interaction
.edit_response(ctx, EditInteractionResponse::new().content("playing..."))
.await?;
} else {
interaction
.create_response(
ctx,
CreateInteractionResponse::Message(
CreateInteractionResponseMessage::new()
.content("Please provide a valid string"),
),
)
.await?;
}
Ok(())
}
pub fn register() -> CreateCommand {
CreateCommand::new("testnats")
.description("Play a song")
.add_option(
CreateCommandOption::new(
CommandOptionType::String,
"song",
"Name or url of the song to play",
)
.required(false),
)
}
+25 -30
View File
@@ -6,12 +6,13 @@ use serenity::{
Client,
all::{Context, EventHandler, GatewayIntents},
async_trait,
builder::{CreateInteractionResponse, CreateInteractionResponseMessage},
model::{
application::{Command, Interaction},
gateway::Ready,
},
};
use serenity_libs::functions::CustomInteraction;
use types::error::{CorroError, CorroErrorType};
struct Handler {
nats_client: async_nats::Client,
@@ -21,30 +22,28 @@ struct Handler {
impl EventHandler for Handler {
async fn interaction_create(&self, ctx: Context, interaction: Interaction) {
if let Interaction::Command(command) = interaction {
println!("Received command interaction: {command:#?}");
println!("Received command interaction: {command:?}");
let content = match command.data.name.as_str() {
"play" => {
commands::play::run(&ctx, &command, &self.nats_client)
.await
.unwrap();
None
}
"ping" => Some(commands::ping::run(&command.data.options())),
"testnats" => {
commands::testnats::run(&ctx, &command, &self.nats_client)
.await
.unwrap();
None
}
_ => Some("not implemented :(".to_string()),
};
if let Err(why) = match command.data.name.as_str() {
"play" => commands::play::run(&ctx, &command, &self.nats_client).await,
"ping" => commands::ping::run(&ctx, &command).await,
_ => Err(CorroError {
error_type: CorroErrorType::CommandError,
message: format!("Command {} not implemented", &command.data.name),
}),
} {
println!("{:?}: {}", why.error_type, why.message);
if let Some(content) = content {
let data = CreateInteractionResponseMessage::new().content(content);
let builder = CreateInteractionResponse::Message(data);
if let Err(why) = command.create_response(&ctx.http, builder).await {
println!("Cannot respond to slash command: {why}");
match why.error_type {
CorroErrorType::CommandError | CorroErrorType::SerenityError => {}
_ => {
let _ = command
.edit_text_response(
&ctx,
format!("{:?}: {}", why.error_type, why.message),
)
.await;
}
}
}
}
@@ -55,15 +54,11 @@ impl EventHandler for Handler {
if let Err(why) = Command::set_global_commands(
&ctx.http,
vec![
commands::ping::register(),
commands::play::register(),
commands::testnats::register(),
],
vec![commands::ping::register(), commands::play::register()],
)
.await
{
println!("Client error: {why:?}");
println!("Set commands error: {why:?}");
}
}
}
@@ -90,6 +85,6 @@ async fn main() {
// Shards will automatically attempt to reconnect, and will perform exponential backoff until
// it reconnects.
if let Err(why) = discord_client.start().await {
println!("Client error: {why:?}");
println!("Client start error: {why:?}");
}
}
+3 -2
View File
@@ -3,11 +3,12 @@ name = "worker"
version = "0.1.0"
edition = "2024"
[package.metadata.cargo-machete]
ignored = ["symphonia"]
[dependencies]
async-nats = { version = "0.46.0" }
futures = { version = "0.3.32" }
futures-executor = { version = "0.3.32" }
nats = { path = "../../libs/nats" }
postcard = { version = "1.1.3", features = ["use-std"] }
rustls = { version = "0.23.37", default-features = false, features = [
"aws-lc-rs",
+6 -4
View File
@@ -24,7 +24,9 @@ impl EventHandler for Handler {
async fn ready(&self, ctx: Context, ready: Ready) {
println!("{} is connected!", ready.user.name);
let voice_manager = songbird::get(&ctx).await;
let voice_manager = songbird::get(&ctx)
.await
.expect("Cannot init voice manager");
let mut subscriber = self
.nats_client
@@ -65,9 +67,9 @@ impl EventHandler for Handler {
let response = match result {
Ok(response) => response,
Err(err) => JobResponse {
Err(why) => JobResponse {
content: None,
error: Some(err),
error: Some(why),
},
};
@@ -115,6 +117,6 @@ async fn main() {
.expect("Error creating discord client");
if let Err(why) = discord_client.start().await {
println!("Client error: {why:?}");
println!("Client start error: {why:?}");
}
}
+17 -8
View File
@@ -1,4 +1,7 @@
use types::jobs::{DownloadJob, DownloadResponse, JobResponse};
use types::{
error::{CorroError, CorroErrorType},
jobs::{DownloadJob, DownloadResponse, JobResponse},
};
use yt_dlp::{
Downloader,
model::{AudioCodecPreference, AudioQuality},
@@ -7,12 +10,15 @@ use yt_dlp::{
pub async fn download(
downloader: &Downloader,
job: DownloadJob,
) -> Result<JobResponse<DownloadResponse>, String> {
println!("job: {:?}", job);
) -> Result<JobResponse<DownloadResponse>, CorroError> {
let video = match downloader.fetch_video_infos(job.url).await {
Ok(video) => video,
Err(err) => return Err(err.to_string()),
Err(why) => {
return Err(CorroError {
error_type: CorroErrorType::YtdlpError,
message: why.to_string(),
});
}
};
let audio_path = match downloader
@@ -25,11 +31,14 @@ pub async fn download(
.await
{
Ok(path) => path,
Err(err) => return Err(err.to_string()),
Err(why) => {
return Err(CorroError {
error_type: CorroErrorType::YtdlpError,
message: why.to_string(),
});
}
};
println!("audio_path: {:?}", &audio_path);
Ok(JobResponse {
content: Some(DownloadResponse {
path: audio_path.clone(),
+30 -35
View File
@@ -6,46 +6,41 @@ use songbird::{
events::{Event, EventContext, EventHandler as VoiceEventHandler, TrackEvent},
input::File,
};
use types::jobs::{JobResponse, PlayJob, PlayResponse};
use types::{
error::CorroError,
jobs::{JobResponse, PlayJob, PlayResponse},
};
pub async fn play(
voice_manager: &Option<Arc<Songbird>>,
voice_manager: &Arc<Songbird>,
job: PlayJob,
) -> Result<JobResponse<PlayResponse>, String> {
println!("job: {:?}", job);
if let Some(voice_manager) = voice_manager {
if let Ok(handler_lock) = voice_manager.join(job.guild_id, job.channel_id).await {
// Attach an event handler to see notifications of all track errors.
let mut handler = handler_lock.lock().await;
handler.add_global_event(TrackEvent::Error.into(), TrackErrorNotifier);
}
let handler_lock = match voice_manager.get(job.guild_id) {
Some(handler) => handler,
None => {
return Ok(JobResponse {
content: Some(PlayResponse {}),
error: None,
});
}
};
) -> Result<JobResponse<PlayResponse>, CorroError> {
if let Ok(handler_lock) = voice_manager.join(job.guild_id, job.channel_id).await {
// Attach an event handler to see notifications of all track errors.
let mut handler = handler_lock.lock().await;
let src = File::new(job.path).clone();
let track_handle = handler.play_input(src.into());
println!("{:?}", track_handle.get_info().await);
Ok(JobResponse {
content: Some(PlayResponse {}),
error: None,
})
} else {
Err("No voice_manager defined".to_string())
handler.add_global_event(TrackEvent::Error.into(), TrackErrorNotifier);
}
let handler_lock = match voice_manager.get(job.guild_id) {
Some(handler) => handler,
None => {
return Ok(JobResponse {
content: Some(PlayResponse {}),
error: None,
});
}
};
let mut handler = handler_lock.lock().await;
let src = File::new(job.path).clone();
let _ = handler.play_input(src.into());
Ok(JobResponse {
content: Some(PlayResponse {}),
error: None,
})
}
struct TrackErrorNotifier;
+10 -7
View File
@@ -1,4 +1,5 @@
use types::{
error::{CorroError, CorroErrorType},
jobs::{JobResponse, SearchJob, SearchResponse},
misc::parse_url_or_default,
queue::YoutubeSong,
@@ -8,9 +9,7 @@ use yt_dlp::Downloader;
pub async fn search(
downloader: &Downloader,
job: SearchJob,
) -> Result<JobResponse<SearchResponse>, String> {
println!("job: {:?}", job);
) -> Result<JobResponse<SearchResponse>, CorroError> {
let result = match downloader
.youtube_extractor()
.search_first(&job.query)
@@ -32,16 +31,20 @@ pub async fn search(
},
None => JobResponse {
content: None,
error: Some("url is not defined".to_string()),
error: Some(CorroError {
error_type: CorroErrorType::JobError,
message: "webpage_url is not defined".to_string(),
}),
},
},
Err(why) => JobResponse {
content: None,
error: Some(format!("{why}")),
error: Some(CorroError {
error_type: CorroErrorType::JobError,
message: why.to_string(),
}),
},
};
println!("reply: {:?}", result.content);
Ok(result)
}