Files
yiliao/backend/services/data_store.py

136 lines
4.6 KiB
Python

import json
import os
from pathlib import Path
from typing import Dict, Any, Optional
from datetime import datetime
import threading
class DataStore:
"""数据持久化存储服务"""
def __init__(self, storage_dir: str = "data"):
self.storage_dir = Path(storage_dir)
self.storage_dir.mkdir(exist_ok=True)
# 数据文件路径
self.data_file = self.storage_dir / "reports_data.json"
# 内存缓存
self._cache: Dict[str, Any] = {}
# 线程锁,防止并发写入冲突
self._lock = threading.Lock()
# 启动时加载数据
self._load_data()
def _load_data(self):
"""从文件加载数据"""
if self.data_file.exists():
try:
with open(self.data_file, 'r', encoding='utf-8') as f:
self._cache = json.load(f)
print(f"✓ 成功加载 {len(self._cache)} 份报告数据")
except Exception as e:
print(f"⚠ 加载数据失败: {e},将使用空数据")
self._cache = {}
else:
print("✓ 数据文件不存在,将创建新文件")
self._cache = {}
def _save_data(self):
"""保存数据到文件"""
try:
with self._lock:
# 创建临时文件,避免写入过程中断导致数据损坏
temp_file = self.data_file.with_suffix('.json.tmp')
with open(temp_file, 'w', encoding='utf-8') as f:
json.dump(self._cache, f, ensure_ascii=False, indent=2)
# 原子性替换
temp_file.replace(self.data_file)
except Exception as e:
print(f"⚠ 保存数据失败: {e}")
def get_all(self) -> Dict[str, Any]:
"""获取所有报告数据"""
return self._cache.copy()
def get(self, file_id: str) -> Optional[Dict[str, Any]]:
"""获取单个报告数据"""
return self._cache.get(file_id)
def set(self, file_id: str, data: Dict[str, Any]) -> None:
"""设置/更新报告数据"""
self._cache[file_id] = data
self._save_data()
def update(self, file_id: str, updates: Dict[str, Any]) -> None:
"""更新报告数据的部分字段"""
if file_id in self._cache:
self._cache[file_id].update(updates)
self._save_data()
else:
raise KeyError(f"报告 {file_id} 不存在")
def delete(self, file_id: str) -> None:
"""删除报告数据"""
if file_id in self._cache:
del self._cache[file_id]
self._save_data()
def exists(self, file_id: str) -> bool:
"""检查报告是否存在"""
return file_id in self._cache
def count(self) -> int:
"""获取报告总数"""
return len(self._cache)
def cleanup_orphaned_files(self, upload_dir: Path) -> int:
"""清理孤立的文件(数据库中有记录但文件不存在)"""
cleaned = 0
orphaned_ids = []
for file_id, report in self._cache.items():
filepath = report.get('filepath')
if filepath and not os.path.exists(filepath):
orphaned_ids.append(file_id)
for file_id in orphaned_ids:
self.delete(file_id)
cleaned += 1
if cleaned > 0:
print(f"✓ 清理了 {cleaned} 条孤立记录")
return cleaned
def export_backup(self, backup_path: Optional[str] = None) -> str:
"""导出备份"""
if backup_path is None:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_path = self.storage_dir / f"backup_{timestamp}.json"
else:
backup_path = Path(backup_path)
with open(backup_path, 'w', encoding='utf-8') as f:
json.dump(self._cache, f, ensure_ascii=False, indent=2)
print(f"✓ 数据已备份到: {backup_path}")
return str(backup_path)
def import_backup(self, backup_path: str) -> None:
"""从备份恢复数据"""
backup_path = Path(backup_path)
if not backup_path.exists():
raise FileNotFoundError(f"备份文件不存在: {backup_path}")
with open(backup_path, 'r', encoding='utf-8') as f:
imported_data = json.load(f)
self._cache.update(imported_data)
self._save_data()
print(f"✓ 已从备份恢复 {len(imported_data)} 份报告")