2023-04-27 23:19:18 +00:00
|
|
|
from logging import Logger
|
|
|
|
from redis.asyncio.lock import Lock
|
2023-04-23 22:48:31 +00:00
|
|
|
import redis.asyncio as redis
|
|
|
|
import pickle
|
|
|
|
|
|
|
|
from config import RedisConfig
|
|
|
|
|
|
|
|
|
|
|
|
class Redis:
|
2023-04-27 23:19:18 +00:00
|
|
|
def __init__(self, logger: Logger, config: RedisConfig) -> None:
|
|
|
|
self._client = redis.Redis(
|
2023-04-23 22:48:31 +00:00
|
|
|
host=config.host,
|
|
|
|
port=config.port,
|
|
|
|
password=config.password,
|
|
|
|
auto_close_connection_pool=False,
|
|
|
|
)
|
2023-04-27 23:19:18 +00:00
|
|
|
self._locks: dict[str, Lock] = {}
|
|
|
|
self.logger = logger
|
2023-04-23 22:48:31 +00:00
|
|
|
|
2023-04-27 23:19:18 +00:00
|
|
|
async def _get_lock(self, key) -> Lock:
|
|
|
|
if key not in self._locks:
|
|
|
|
self._locks[key] = self._client.lock(key)
|
|
|
|
return self._locks[key]
|
2023-04-23 22:48:31 +00:00
|
|
|
|
2023-04-27 23:19:18 +00:00
|
|
|
async def acquire(self, key: str) -> None:
|
|
|
|
lock = await self._get_lock(f"djembe:queue:{key}")
|
|
|
|
await lock.acquire()
|
|
|
|
|
|
|
|
async def release(self, key: str) -> None:
|
|
|
|
lock = await self._get_lock(f"djembe:queue:{key}")
|
|
|
|
await lock.release()
|
|
|
|
|
|
|
|
async def get(self, key: str):
|
|
|
|
self.logger.info(f"get value {key} from redis")
|
|
|
|
value = await self._client.get(f"djembe:queue:{key}")
|
|
|
|
if value:
|
|
|
|
return pickle.loads(value)
|
|
|
|
return None
|
|
|
|
|
|
|
|
async def set(self, key: str, value) -> None:
|
|
|
|
self.logger.info(f"set value {key} to redis")
|
|
|
|
await self._client.set(f"djembe:queue:{key}", pickle.dumps(value))
|
2023-04-23 22:48:31 +00:00
|
|
|
|
|
|
|
async def close(self) -> None:
|
2023-04-27 23:19:18 +00:00
|
|
|
await self._client.close()
|