Files
schoolNews/schoolNewsServ/.bin/mysql/sql/sensitiveData/writeWord.py

203 lines
6.1 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
敏感词批量导入脚本
sensitive_word_dict.txt 文件读取敏感词并批量插入到数据库
"""
import pymysql
import os
import sys
2025-11-22 16:01:36 +08:00
import argparse
from datetime import datetime
# 数据库配置
DB_CONFIG = {
'host': 'localhost',
'port': 3306,
'user': 'root',
'password': '123456',
'database': 'school_news',
'charset': 'utf8mb4'
}
def get_db_connection():
"""获取数据库连接"""
try:
connection = pymysql.connect(**DB_CONFIG)
return connection
except Exception as e:
print(f"数据库连接失败: {e}")
return None
def read_sensitive_words(file_path):
"""读取敏感词文件"""
words = []
try:
with open(file_path, 'r', encoding='utf-8') as f:
for line in f:
word = line.strip()
if word and len(word) > 0:
words.append(word)
print(f"成功读取 {len(words)} 个敏感词")
return words
except Exception as e:
print(f"读取敏感词文件失败: {e}")
return []
def batch_insert_words(connection, words, batch_size=1000):
"""批量插入敏感词到数据库"""
cursor = connection.cursor()
try:
# 清空现有的deny类型敏感词可选
print("清理现有的deny类型敏感词...")
cursor.execute("DELETE FROM tb_sensitive_word WHERE type = 'deny'")
# 准备批量插入SQL
insert_sql = "INSERT INTO tb_sensitive_word (word, type) VALUES (%s, %s)"
# 分批插入
total_words = len(words)
inserted_count = 0
for i in range(0, total_words, batch_size):
batch_words = words[i:i + batch_size]
batch_data = [(word, 'deny') for word in batch_words]
try:
cursor.executemany(insert_sql, batch_data)
connection.commit()
inserted_count += len(batch_data)
print(f"已插入 {inserted_count}/{total_words} 个敏感词")
except Exception as e:
print(f"批量插入失败: {e}")
connection.rollback()
break
print(f"批量插入完成,共插入 {inserted_count} 个敏感词")
return inserted_count
except Exception as e:
print(f"批量插入过程中发生错误: {e}")
connection.rollback()
return 0
finally:
cursor.close()
def check_duplicates(connection, words):
"""检查重复的敏感词"""
cursor = connection.cursor()
try:
# 查询已存在的敏感词
cursor.execute("SELECT word FROM tb_sensitive_word WHERE type = 'deny'")
existing_words = set(row[0] for row in cursor.fetchall())
# 过滤重复词
new_words = [word for word in words if word not in existing_words]
duplicate_count = len(words) - len(new_words)
if duplicate_count > 0:
print(f"发现 {duplicate_count} 个重复敏感词,将跳过")
return new_words
except Exception as e:
print(f"检查重复词时发生错误: {e}")
return words
finally:
cursor.close()
def main():
"""主函数"""
2025-11-22 16:01:36 +08:00
# 解析命令行参数
parser = argparse.ArgumentParser(description='敏感词批量导入工具')
parser.add_argument('-y', '--yes', action='store_true',
help='自动确认导入,跳过交互式确认')
parser.add_argument('--file', type=str,
help='指定敏感词文件路径(默认: sensitive_word_dict.txt')
args = parser.parse_args()
print("=" * 50)
print("敏感词批量导入工具")
print("=" * 50)
2025-11-22 16:01:36 +08:00
# 获取敏感词文件路径
if args.file:
dict_file = args.file
else:
script_dir = os.path.dirname(os.path.abspath(__file__))
dict_file = os.path.join(script_dir, 'sensitive_word_dict.txt')
# 检查敏感词文件是否存在
if not os.path.exists(dict_file):
print(f"敏感词文件不存在: {dict_file}")
return
# 读取敏感词
print("正在读取敏感词文件...")
words = read_sensitive_words(dict_file)
if not words:
print("没有读取到有效的敏感词")
return
# 连接数据库
print("正在连接数据库...")
connection = get_db_connection()
if not connection:
print("数据库连接失败,程序退出")
return
try:
# 检查重复词(可选,如果不需要可以注释掉)
# print("正在检查重复敏感词...")
# words = check_duplicates(connection, words)
if not words:
print("所有敏感词都已存在,无需导入")
return
# 确认导入
print(f"准备导入 {len(words)} 个敏感词到数据库")
2025-11-22 16:01:36 +08:00
if args.yes:
print("自动确认模式,开始导入...")
else:
try:
confirm = input("是否继续?(y/N): ").strip().lower()
if confirm != 'y':
print("用户取消导入")
return
except EOFError:
print("检测到非交互式环境,自动确认导入...")
except KeyboardInterrupt:
print("\n用户中断导入")
return
# 批量插入
print("开始批量导入敏感词...")
start_time = datetime.now()
inserted_count = batch_insert_words(connection, words)
end_time = datetime.now()
duration = (end_time - start_time).total_seconds()
print("=" * 50)
print(f"导入完成!")
print(f"成功导入: {inserted_count} 个敏感词")
print(f"耗时: {duration:.2f}")
print("=" * 50)
except Exception as e:
print(f"程序执行过程中发生错误: {e}")
finally:
connection.close()
print("数据库连接已关闭")
if __name__ == "__main__":
main()