queue wip
This commit is contained in:
@@ -0,0 +1,54 @@
|
||||
use std::num::NonZeroU64;
|
||||
|
||||
use async_nats::jetstream::kv::Store;
|
||||
use async_trait::async_trait;
|
||||
use postcard::{from_bytes, to_stdvec};
|
||||
use types::{
|
||||
error::{CorroError, CorroErrorType},
|
||||
queue::Queue,
|
||||
};
|
||||
|
||||
fn get_key(id: NonZeroU64) -> String {
|
||||
format!("corro-dj.{id}")
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
pub trait KVClient {
|
||||
async fn get_queue(&self, id: NonZeroU64) -> Result<Queue, CorroError>;
|
||||
async fn set_queue(&self, id: NonZeroU64, queue: &Queue) -> Result<u64, CorroError>;
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl KVClient for Store {
|
||||
async fn get_queue(&self, id: NonZeroU64) -> Result<Queue, CorroError> {
|
||||
match self.get(get_key(id)).await {
|
||||
Ok(result) => {
|
||||
if let Some(result) = result {
|
||||
Ok(from_bytes(&result).unwrap())
|
||||
} else {
|
||||
Err(CorroError {
|
||||
error_type: CorroErrorType::KVError,
|
||||
message: format!("key {} is undefined", get_key(id)),
|
||||
})
|
||||
}
|
||||
}
|
||||
Err(why) => Err(CorroError {
|
||||
error_type: CorroErrorType::KVError,
|
||||
message: why.to_string(),
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
async fn set_queue(&self, id: NonZeroU64, queue: &Queue) -> Result<u64, CorroError> {
|
||||
match self
|
||||
.put(get_key(id), to_stdvec(&queue).unwrap().into())
|
||||
.await
|
||||
{
|
||||
Ok(sequence) => Ok(sequence),
|
||||
Err(why) => Err(CorroError {
|
||||
error_type: CorroErrorType::KVError,
|
||||
message: why.to_string(),
|
||||
}),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,2 +1,3 @@
|
||||
pub mod job;
|
||||
pub mod kv;
|
||||
pub mod stream;
|
||||
|
||||
Reference in New Issue
Block a user