This commit is contained in:
2026-03-17 00:18:09 +01:00
parent 1564a4de44
commit a679760b55
6 changed files with 112 additions and 61 deletions
+10 -5
View File
@@ -1,4 +1,4 @@
use nats_libs::functions::JobClient;
use nats_libs::job::JobClient;
use serenity::{
all::{CommandInteraction, Context, CreateCommandOption},
builder::CreateCommand,
@@ -12,10 +12,12 @@ use types::{
};
use url::Url;
use crate::common::Services;
pub async fn run(
services: &Services,
ctx: &Context,
interaction: &CommandInteraction,
nats_client: &async_nats::Client,
) -> Result<(), CorroError> {
let options = interaction.data.options();
@@ -36,7 +38,8 @@ pub async fn run(
let is_url = value.starts_with("https://") || value.starts_with("http://");
if !is_url {
let search_response = match nats_client
let search_response = match services
.nats_client
.send_job(JobsMap::Search(SearchJob {
uuid: new_uuid_v4(),
query: value.to_string(),
@@ -60,7 +63,8 @@ pub async fn run(
url = parse_url_or_default(value.to_string());
}
let download_response = match nats_client
let download_response = match services
.nats_client
.send_job(JobsMap::Download(DownloadJob {
uuid: new_uuid_v4(),
url,
@@ -88,7 +92,8 @@ pub async fn run(
.channel_id
.unwrap();
match nats_client
match services
.nats_client
.send_job(JobsMap::Play(PlayJob {
uuid: new_uuid_v4(),
path: download_response.path,
+6
View File
@@ -0,0 +1,6 @@
pub struct Services {
pub nats_client: async_nats::Client,
pub jetstream_client: async_nats::jetstream::Context,
pub jetstream_stream: async_nats::jetstream::stream::Stream,
pub jetstream_kv: async_nats::jetstream::kv::Store,
}
+48 -55
View File
@@ -1,13 +1,13 @@
mod commands;
mod common;
use std::env;
use async_nats::jetstream::{self, context::traits::Requester};
use async_nats::jetstream;
use nats_libs::stream::{JetstreamClient, StreamClient};
use serenity::{
Client,
all::{Context, EventHandler, GatewayIntents},
async_trait,
futures::future::ErrInto,
model::{
application::{Command, Interaction},
gateway::Ready,
@@ -16,8 +16,10 @@ use serenity::{
use serenity_libs::functions::CustomInteraction;
use types::error::{CorroError, CorroErrorType};
use crate::common::Services;
struct Handler {
nats_client: async_nats::Client,
services: Services,
}
#[async_trait]
@@ -27,11 +29,26 @@ impl EventHandler for Handler {
println!("Received command interaction: {command:?}");
if let Err(why) = match command.data.name.as_str() {
"play" => commands::play::run(&ctx, &command, &self.nats_client).await,
"play" => {
let guild_id = command.guild_id.unwrap();
if let Ok(sequence) = self
.services
.jetstream_client
.lock(guild_id.into())
.await
{
let result = commands::play::run(&self.services, &ctx, &command).await;
let _ = self.services.jetstream_stream.unlock(sequence).await;
result
} else {
Err(CorroError { error_type: CorroErrorType::NatsError, message: format!("Cannot lock {guild_id}") })
}
}
"ping" => commands::ping::run(&ctx, &command).await,
_ => Err(CorroError {
error_type: CorroErrorType::CommandError,
message: format!("Command {} not implemented", &command.data.name),
message: format!("Command {} not implemented :(", &command.data.name),
}),
} {
println!("{:?}: {}", why.error_type, why.message);
@@ -75,7 +92,15 @@ async fn main() {
let jetstream_client = jetstream::new(nats_client.clone());
let stream = jetstream_client
let jetstream_kv = jetstream_client
.create_key_value(async_nats::jetstream::kv::Config {
bucket: "profiles".to_string(),
..Default::default()
})
.await
.expect("Error creating queue kv");
let jetstream_stream = jetstream_client
.create_stream(jetstream::stream::Config {
name: "LOCKS".to_string(),
subjects: vec!["locks.*".to_string()],
@@ -88,54 +113,22 @@ async fn main() {
.await
.expect("Error creating locks stream");
println!("created the stream");
let handler = Handler {
services: Services {
nats_client,
jetstream_client,
jetstream_stream,
jetstream_kv,
},
};
match jetstream_client.publish("locks.a", "test".into()).await.unwrap().await {
Ok(a) => {
println!("publish ok {:?}", a);
let mut discord_client =
serenity::Client::builder(discord_token, GatewayIntents::non_privileged())
.event_handler(handler)
.await
.expect("Error creating discord client");
if let Err(why) = discord_client.start().await {
println!("Client start error: {why:?}");
}
Err(why) => {
println!("{why:?}");
}
}
// match jetstream_client.request("locks.a", "test").await {
// Ok(a) => {
// println!("publish ok {a:?}");
// Ok("a")
// }
// Err(why) => {
// println!("{why:?}");
// Err("a")
// }
// };
// match jetstream_client.publish("locks.b", "test".into()).await {
// Ok(a) => {
// println!("publish ok {a:?}");
// }
// Err(why) => {
// println!("{why:?}");
// }
// };
// match jetstream_client.publish("locks.a", "test".into()).await {
// Ok(a) => {
// println!("publish ok {a:?}");
// }
// Err(why) => {
// println!("{why:?}");
// }
// };
println!("done");
// let handler = Handler { nats_client };
// let mut discord_client = Client::builder(discord_token, GatewayIntents::non_privileged())
// .event_handler(handler)
// .await
// .expect("Error creating discord client");
// if let Err(why) = discord_client.start().await {
// println!("Client start error: {why:?}");
// }
}
+2 -1
View File
@@ -1 +1,2 @@
pub mod functions;
pub mod job;
pub mod stream;
+46
View File
@@ -0,0 +1,46 @@
use std::num::NonZeroU64;
use async_nats::jetstream::{Context, stream::Stream};
use async_trait::async_trait;
use types::error::{CorroError, CorroErrorType};
#[async_trait]
pub trait JetstreamClient {
async fn lock(&self, id: NonZeroU64) -> Result<u64, CorroError>;
}
#[async_trait]
impl JetstreamClient for Context {
async fn lock(&self, id: NonZeroU64) -> Result<u64, CorroError> {
match self
.publish(format!("locks.{id}"), "".into())
.await
.unwrap()
.await
{
Ok(ack) => Ok(ack.sequence),
Err(why) => Err(CorroError {
error_type: CorroErrorType::NatsError,
message: why.to_string(),
}),
}
}
}
#[async_trait]
pub trait StreamClient {
async fn unlock(&self, sequence: u64) -> Result<bool, CorroError>;
}
#[async_trait]
impl StreamClient for Stream {
async fn unlock(&self, sequence: u64) -> Result<bool, CorroError> {
match self.delete_message(sequence).await {
Ok(result) => Ok(result),
Err(why) => Err(CorroError {
error_type: CorroErrorType::NatsError,
message: why.to_string(),
}),
}
}
}