Files
urbanLifeline/difyPlugin/app/core/redis.py

129 lines
3.9 KiB
Python
Raw Normal View History

2025-12-30 13:38:32 +08:00
"""Redis 服务"""
import json
from typing import Any, Optional, Union
import redis.asyncio as redis
from redis.asyncio import Redis
from app.config import settings
class RedisService:
"""Redis 服务类"""
_client: Optional[Redis] = None
@classmethod
async def init(cls) -> None:
"""初始化 Redis 连接"""
cls._client = redis.Redis(
host=settings.REDIS_HOST,
port=settings.REDIS_PORT,
password=settings.REDIS_PASSWORD or None,
db=settings.REDIS_DB,
decode_responses=True
)
@classmethod
async def close(cls) -> None:
"""关闭 Redis 连接"""
if cls._client:
await cls._client.close()
cls._client = None
@classmethod
def get_client(cls) -> Redis:
"""获取 Redis 客户端"""
if not cls._client:
raise RuntimeError("Redis 未初始化,请先调用 init()")
return cls._client
# ==================== String 操作 ====================
@classmethod
async def get(cls, key: str) -> Optional[str]:
"""获取值"""
return await cls.get_client().get(key)
@classmethod
async def set(cls, key: str, value: Union[str, int, float], expire: Optional[int] = None) -> bool:
"""设置值"""
return await cls.get_client().set(key, value, ex=expire)
@classmethod
async def delete(cls, *keys: str) -> int:
"""删除键"""
return await cls.get_client().delete(*keys)
@classmethod
async def exists(cls, key: str) -> bool:
"""判断键是否存在"""
return await cls.get_client().exists(key) > 0
@classmethod
async def expire(cls, key: str, seconds: int) -> bool:
"""设置过期时间"""
return await cls.get_client().expire(key, seconds)
@classmethod
async def ttl(cls, key: str) -> int:
"""获取剩余过期时间"""
return await cls.get_client().ttl(key)
# ==================== JSON 操作 ====================
@classmethod
async def get_json(cls, key: str) -> Optional[Any]:
"""获取 JSON 值"""
value = await cls.get(key)
return json.loads(value) if value else None
@classmethod
async def set_json(cls, key: str, value: Any, expire: Optional[int] = None) -> bool:
"""设置 JSON 值"""
return await cls.set(key, json.dumps(value, ensure_ascii=False), expire)
# ==================== Hash 操作 ====================
@classmethod
async def hget(cls, name: str, key: str) -> Optional[str]:
"""获取 Hash 字段值"""
return await cls.get_client().hget(name, key)
@classmethod
async def hset(cls, name: str, key: str, value: str) -> int:
"""设置 Hash 字段值"""
return await cls.get_client().hset(name, key, value)
@classmethod
async def hgetall(cls, name: str) -> dict:
"""获取 Hash 所有字段"""
return await cls.get_client().hgetall(name)
@classmethod
async def hdel(cls, name: str, *keys: str) -> int:
"""删除 Hash 字段"""
return await cls.get_client().hdel(name, *keys)
# ==================== List 操作 ====================
@classmethod
async def lpush(cls, key: str, *values: str) -> int:
"""左侧插入列表"""
return await cls.get_client().lpush(key, *values)
@classmethod
async def rpush(cls, key: str, *values: str) -> int:
"""右侧插入列表"""
return await cls.get_client().rpush(key, *values)
@classmethod
async def lrange(cls, key: str, start: int = 0, end: int = -1) -> list:
"""获取列表范围"""
return await cls.get_client().lrange(key, start, end)
@classmethod
async def llen(cls, key: str) -> int:
"""获取列表长度"""
return await cls.get_client().llen(key)