Files
yiliao/backend/services/data_store.py

136 lines
4.6 KiB
Python
Raw Permalink Normal View History

import json
import os
from pathlib import Path
from typing import Dict, Any, Optional
from datetime import datetime
import threading
class DataStore:
"""数据持久化存储服务"""
def __init__(self, storage_dir: str = "data"):
self.storage_dir = Path(storage_dir)
self.storage_dir.mkdir(exist_ok=True)
# 数据文件路径
self.data_file = self.storage_dir / "reports_data.json"
# 内存缓存
self._cache: Dict[str, Any] = {}
# 线程锁,防止并发写入冲突
self._lock = threading.Lock()
# 启动时加载数据
self._load_data()
def _load_data(self):
"""从文件加载数据"""
if self.data_file.exists():
try:
with open(self.data_file, 'r', encoding='utf-8') as f:
self._cache = json.load(f)
print(f"✓ 成功加载 {len(self._cache)} 份报告数据")
except Exception as e:
print(f"⚠ 加载数据失败: {e},将使用空数据")
self._cache = {}
else:
print("✓ 数据文件不存在,将创建新文件")
self._cache = {}
def _save_data(self):
"""保存数据到文件"""
try:
with self._lock:
# 创建临时文件,避免写入过程中断导致数据损坏
temp_file = self.data_file.with_suffix('.json.tmp')
with open(temp_file, 'w', encoding='utf-8') as f:
json.dump(self._cache, f, ensure_ascii=False, indent=2)
# 原子性替换
temp_file.replace(self.data_file)
except Exception as e:
print(f"⚠ 保存数据失败: {e}")
def get_all(self) -> Dict[str, Any]:
"""获取所有报告数据"""
return self._cache.copy()
def get(self, file_id: str) -> Optional[Dict[str, Any]]:
"""获取单个报告数据"""
return self._cache.get(file_id)
def set(self, file_id: str, data: Dict[str, Any]) -> None:
"""设置/更新报告数据"""
self._cache[file_id] = data
self._save_data()
def update(self, file_id: str, updates: Dict[str, Any]) -> None:
"""更新报告数据的部分字段"""
if file_id in self._cache:
self._cache[file_id].update(updates)
self._save_data()
else:
raise KeyError(f"报告 {file_id} 不存在")
def delete(self, file_id: str) -> None:
"""删除报告数据"""
if file_id in self._cache:
del self._cache[file_id]
self._save_data()
def exists(self, file_id: str) -> bool:
"""检查报告是否存在"""
return file_id in self._cache
def count(self) -> int:
"""获取报告总数"""
return len(self._cache)
def cleanup_orphaned_files(self, upload_dir: Path) -> int:
"""清理孤立的文件(数据库中有记录但文件不存在)"""
cleaned = 0
orphaned_ids = []
for file_id, report in self._cache.items():
filepath = report.get('filepath')
if filepath and not os.path.exists(filepath):
orphaned_ids.append(file_id)
for file_id in orphaned_ids:
self.delete(file_id)
cleaned += 1
if cleaned > 0:
print(f"✓ 清理了 {cleaned} 条孤立记录")
return cleaned
def export_backup(self, backup_path: Optional[str] = None) -> str:
"""导出备份"""
if backup_path is None:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_path = self.storage_dir / f"backup_{timestamp}.json"
else:
backup_path = Path(backup_path)
with open(backup_path, 'w', encoding='utf-8') as f:
json.dump(self._cache, f, ensure_ascii=False, indent=2)
print(f"✓ 数据已备份到: {backup_path}")
return str(backup_path)
def import_backup(self, backup_path: str) -> None:
"""从备份恢复数据"""
backup_path = Path(backup_path)
if not backup_path.exists():
raise FileNotFoundError(f"备份文件不存在: {backup_path}")
with open(backup_path, 'r', encoding='utf-8') as f:
imported_data = json.load(f)
self._cache.update(imported_data)
self._save_data()
print(f"✓ 已从备份恢复 {len(imported_data)} 份报告")