136 lines
4.6 KiB
Python
136 lines
4.6 KiB
Python
|
|
import json
|
||
|
|
import os
|
||
|
|
from pathlib import Path
|
||
|
|
from typing import Dict, Any, Optional
|
||
|
|
from datetime import datetime
|
||
|
|
import threading
|
||
|
|
|
||
|
|
class DataStore:
|
||
|
|
"""数据持久化存储服务"""
|
||
|
|
|
||
|
|
def __init__(self, storage_dir: str = "data"):
|
||
|
|
self.storage_dir = Path(storage_dir)
|
||
|
|
self.storage_dir.mkdir(exist_ok=True)
|
||
|
|
|
||
|
|
# 数据文件路径
|
||
|
|
self.data_file = self.storage_dir / "reports_data.json"
|
||
|
|
|
||
|
|
# 内存缓存
|
||
|
|
self._cache: Dict[str, Any] = {}
|
||
|
|
|
||
|
|
# 线程锁,防止并发写入冲突
|
||
|
|
self._lock = threading.Lock()
|
||
|
|
|
||
|
|
# 启动时加载数据
|
||
|
|
self._load_data()
|
||
|
|
|
||
|
|
def _load_data(self):
|
||
|
|
"""从文件加载数据"""
|
||
|
|
if self.data_file.exists():
|
||
|
|
try:
|
||
|
|
with open(self.data_file, 'r', encoding='utf-8') as f:
|
||
|
|
self._cache = json.load(f)
|
||
|
|
print(f"✓ 成功加载 {len(self._cache)} 份报告数据")
|
||
|
|
except Exception as e:
|
||
|
|
print(f"⚠ 加载数据失败: {e},将使用空数据")
|
||
|
|
self._cache = {}
|
||
|
|
else:
|
||
|
|
print("✓ 数据文件不存在,将创建新文件")
|
||
|
|
self._cache = {}
|
||
|
|
|
||
|
|
def _save_data(self):
|
||
|
|
"""保存数据到文件"""
|
||
|
|
try:
|
||
|
|
with self._lock:
|
||
|
|
# 创建临时文件,避免写入过程中断导致数据损坏
|
||
|
|
temp_file = self.data_file.with_suffix('.json.tmp')
|
||
|
|
with open(temp_file, 'w', encoding='utf-8') as f:
|
||
|
|
json.dump(self._cache, f, ensure_ascii=False, indent=2)
|
||
|
|
|
||
|
|
# 原子性替换
|
||
|
|
temp_file.replace(self.data_file)
|
||
|
|
except Exception as e:
|
||
|
|
print(f"⚠ 保存数据失败: {e}")
|
||
|
|
|
||
|
|
def get_all(self) -> Dict[str, Any]:
|
||
|
|
"""获取所有报告数据"""
|
||
|
|
return self._cache.copy()
|
||
|
|
|
||
|
|
def get(self, file_id: str) -> Optional[Dict[str, Any]]:
|
||
|
|
"""获取单个报告数据"""
|
||
|
|
return self._cache.get(file_id)
|
||
|
|
|
||
|
|
def set(self, file_id: str, data: Dict[str, Any]) -> None:
|
||
|
|
"""设置/更新报告数据"""
|
||
|
|
self._cache[file_id] = data
|
||
|
|
self._save_data()
|
||
|
|
|
||
|
|
def update(self, file_id: str, updates: Dict[str, Any]) -> None:
|
||
|
|
"""更新报告数据的部分字段"""
|
||
|
|
if file_id in self._cache:
|
||
|
|
self._cache[file_id].update(updates)
|
||
|
|
self._save_data()
|
||
|
|
else:
|
||
|
|
raise KeyError(f"报告 {file_id} 不存在")
|
||
|
|
|
||
|
|
def delete(self, file_id: str) -> None:
|
||
|
|
"""删除报告数据"""
|
||
|
|
if file_id in self._cache:
|
||
|
|
del self._cache[file_id]
|
||
|
|
self._save_data()
|
||
|
|
|
||
|
|
def exists(self, file_id: str) -> bool:
|
||
|
|
"""检查报告是否存在"""
|
||
|
|
return file_id in self._cache
|
||
|
|
|
||
|
|
def count(self) -> int:
|
||
|
|
"""获取报告总数"""
|
||
|
|
return len(self._cache)
|
||
|
|
|
||
|
|
def cleanup_orphaned_files(self, upload_dir: Path) -> int:
|
||
|
|
"""清理孤立的文件(数据库中有记录但文件不存在)"""
|
||
|
|
cleaned = 0
|
||
|
|
orphaned_ids = []
|
||
|
|
|
||
|
|
for file_id, report in self._cache.items():
|
||
|
|
filepath = report.get('filepath')
|
||
|
|
if filepath and not os.path.exists(filepath):
|
||
|
|
orphaned_ids.append(file_id)
|
||
|
|
|
||
|
|
for file_id in orphaned_ids:
|
||
|
|
self.delete(file_id)
|
||
|
|
cleaned += 1
|
||
|
|
|
||
|
|
if cleaned > 0:
|
||
|
|
print(f"✓ 清理了 {cleaned} 条孤立记录")
|
||
|
|
|
||
|
|
return cleaned
|
||
|
|
|
||
|
|
def export_backup(self, backup_path: Optional[str] = None) -> str:
|
||
|
|
"""导出备份"""
|
||
|
|
if backup_path is None:
|
||
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||
|
|
backup_path = self.storage_dir / f"backup_{timestamp}.json"
|
||
|
|
else:
|
||
|
|
backup_path = Path(backup_path)
|
||
|
|
|
||
|
|
with open(backup_path, 'w', encoding='utf-8') as f:
|
||
|
|
json.dump(self._cache, f, ensure_ascii=False, indent=2)
|
||
|
|
|
||
|
|
print(f"✓ 数据已备份到: {backup_path}")
|
||
|
|
return str(backup_path)
|
||
|
|
|
||
|
|
def import_backup(self, backup_path: str) -> None:
|
||
|
|
"""从备份恢复数据"""
|
||
|
|
backup_path = Path(backup_path)
|
||
|
|
if not backup_path.exists():
|
||
|
|
raise FileNotFoundError(f"备份文件不存在: {backup_path}")
|
||
|
|
|
||
|
|
with open(backup_path, 'r', encoding='utf-8') as f:
|
||
|
|
imported_data = json.load(f)
|
||
|
|
|
||
|
|
self._cache.update(imported_data)
|
||
|
|
self._save_data()
|
||
|
|
|
||
|
|
print(f"✓ 已从备份恢复 {len(imported_data)} 份报告")
|