everything的确是一个很好的工具,所以仿照开发一个
[code]# -*- coding: utf-8 -*-import sysimport osimport sqlite3import threadingfrom pathlib import Pathfrom PyQt5.QtWidgets import ( QApplication, QMainWindow, QWidget, QVBoxLayout, QHBoxLayout, QPushButton, QLineEdit, QListWidget, QLabel, QCheckBox, QProgressBar, QMessageBox, QGridLayout, QGroupBox)from PyQt5.QtCore import Qt, pyqtSignal, QObjectclass IndexerSignals(QObject): count_update = pyqtSignal(int) status = pyqtSignal(str) current_file = pyqtSignal(str) finished = pyqtSignal(int) error = pyqtSignal(str)class FileIndexer(QObject): def __init__(self): super().__init__() self.signals = IndexerSignals() self.stop_requested = False def stop(self): self.stop_requested = True def build_index(self, roots): db_path = Path.home() / ".file_searcher_db.sqlite" # 删除旧库,确保干净重建 if db_path.exists(): try: os.remove(db_path) except: pass try: conn = sqlite3.connect(str(db_path)) cur = conn.cursor() cur.execute(""" CREATE VIRTUAL TABLE IF NOT EXISTS files USING fts5( name, path UNINDEXED, -- UNINDEXED 表示该字段存储但不参与全文索引(节省空间且避免误搜) is_dir UNINDEXED, tokenize = 'unicode61' ); """) conn.commit() processed_count = 0 batch = [] self.signals.status.emit("正在极速扫描中...") for root in roots: if self.stop_requested: break for dirpath, dirnames, filenames in os.walk(root): if self.stop_requested: break # 1. 处理文件夹 for dname in dirnames: full_path = os.path.join(dirpath, dname) batch.append((dname, full_path, 1)) # 1 代表文件夹 processed_count += 1 # 2. 处理文件 for fname in filenames: full_path = os.path.join(dirpath, fname) batch.append((fname, full_path, 0)) # 0 代表文件 processed_count += 1 if len(batch) >= 2000: cur.executemany("INSERT INTO files(name, path, is_dir) VALUES (?, ?, ?)", batch) conn.commit() batch.clear() # 发送信号更新界面 self.signals.count_update.emit(processed_count) self.signals.current_file.emit(dirpath) # 显示当前扫描到的目录即可 # 写入剩余的数据 if batch: cur.executemany("INSERT INTO files(name, path, is_dir) VALUES (?, ?, ?)", batch) conn.commit() conn.close() if not self.stop_requested: self.signals.finished.emit(processed_count) else: self.signals.status.emit("索引已停止") except Exception as e: self.signals.error.emit(str(e))class MainWindow(QMainWindow): def __init__(self): super().__init__() self.setWindowTitle("极速文件搜索器 v3.0 (逻辑修正版)") self.setGeometry(200, 100, 950, 700) self.db_path = Path.home() / ".file_searcher_db.sqlite" self.indexer = FileIndexer() self.thread = None self.init_ui() def init_ui(self): central = QWidget() self.setCentralWidget(central) layout = QVBoxLayout(central) # === 1. 盘符选择 === group_disk = QGroupBox("1. 索引设置") h_disk = QHBoxLayout() self.drives_layout = QGridLayout() self.checkboxes = {} self.refresh_drives() # 初始化盘符 btn_refresh = QPushButton("刷新盘符") btn_refresh.clicked.connect(self.refresh_drives) h_disk.addLayout(self.drives_layout) h_disk.addWidget(btn_refresh) group_disk.setLayout(h_disk) # === 2. 控制台与进度 === group_ctrl = QGroupBox("2. 索引控制") v_ctrl = QVBoxLayout() h_btns = QHBoxLayout() self.btn_update = QPushButton("开始建立索引") self.btn_update.setStyleSheet("background-color: #2E7D32; color: white; font-weight: bold; padding: 8px;") self.btn_update.clicked.connect(self.start_indexing) self.btn_stop = QPushButton("停止") self.btn_stop.setStyleSheet("background-color: #C62828; color: white; padding: 8px;") self.btn_stop.clicked.connect(self.stop_indexing) self.btn_stop.setEnabled(False) h_btns.addWidget(self.btn_update) h_btns.addWidget(self.btn_stop) self.lbl_status = QLabel("准备就绪") self.progress_bar = QProgressBar() self.progress_bar.setTextVisible(False) # 不显示百分比,因为没有总数 self.progress_bar.setRange(0, 0) self.progress_bar.hide() self.lbl_count = QLabel("已收录: 0") self.lbl_current = QLabel("...") self.lbl_current.setStyleSheet("color: gray; font-size: 10px;") v_ctrl.addLayout(h_btns) v_ctrl.addWidget(self.lbl_status) v_ctrl.addWidget(self.progress_bar) h_info = QHBoxLayout() h_info.addWidget(self.lbl_count) h_info.addWidget(self.lbl_current) v_ctrl.addLayout(h_info) group_ctrl.setLayout(v_ctrl) # === 3. 搜索区域 === group_search = QGroupBox("3. 极速搜索") v_search = QVBoxLayout() h_filter = QHBoxLayout() self.chk_search_file = QCheckBox("搜文件") self.chk_search_file.setChecked(True) self.chk_search_file.stateChanged.connect(lambda: self.on_search(self.edit_search.text())) self.chk_search_dir = QCheckBox("搜文件夹") self.chk_search_dir.setChecked(True) self.chk_search_dir.stateChanged.connect(lambda: self.on_search(self.edit_search.text())) h_filter.addWidget(QLabel("过滤类型:")) h_filter.addWidget(self.chk_search_file) h_filter.addWidget(self.chk_search_dir) h_filter.addStretch() self.edit_search = QLineEdit() self.edit_search.setPlaceholderText("输入文件名关键词(空格隔开表示“与”,例如:合同 2024)...") self.edit_search.setStyleSheet("font-size: 14pt; padding: 6px;") self.edit_search.textChanged.connect(self.on_search) self.list_result = QListWidget() self.list_result.setStyleSheet("font-size: 11pt;") self.list_result.itemDoubleClicked.connect(self.open_item) v_search.addLayout(h_filter) v_search.addWidget(self.edit_search) v_search.addWidget(self.list_result) group_search.setLayout(v_search) # 添加到主布局 layout.addWidget(group_disk) layout.addWidget(group_ctrl) layout.addWidget(group_search) def get_drives(self): drives = [] for letter in "ABCDEFGHIJKLMNOPQRSTUVWXYZ": d = f"{letter}:\\" if os.path.exists(d): drives.append(d) return drives def refresh_drives(self): # 清除旧的 for i in reversed(range(self.drives_layout.count())): self.drives_layout.itemAt(i).widget().setParent(None) self.checkboxes.clear() drives = self.get_drives() for i, d in enumerate(drives): cb = QCheckBox(d) # 默认只勾选 C 和 D,避免全部勾选太慢 if d.startswith("C") or d.startswith("D"): cb.setChecked(True) self.checkboxes[d] = cb self.drives_layout.addWidget(cb, i // 6, i % 6) def start_indexing(self): roots = [d for d, cb in self.checkboxes.items() if cb.isChecked()] if not roots: QMessageBox.warning(self, "提示", "请至少选择一个盘符!") return self.btn_update.setEnabled(False) self.btn_stop.setEnabled(True) self.progress_bar.show() self.list_result.clear() self.list_result.addItem("正在建立索引,建立过程中也可以尝试搜索...") self.indexer = FileIndexer() self.thread = threading.Thread(target=self.indexer.build_index, args=(roots,), daemon=True) self.indexer.signals.count_update.connect(lambda n: self.lbl_count.setText(f"已收录: {n:,}")) self.indexer.signals.status.connect(self.lbl_status.setText) self.indexer.signals.current_file.connect(self.lbl_current.setText) self.indexer.signals.finished.connect(self.on_finished) self.thread.start() def stop_indexing(self): self.indexer.stop() self.btn_stop.setEnabled(False) self.lbl_status.setText("正在停止...") def on_finished(self, total): self.progress_bar.hide() self.btn_update.setEnabled(True) self.btn_stop.setEnabled(False) self.lbl_status.setText("索引完成") self.lbl_current.setText("") QMessageBox.information(self, "完成", f"索引更新完毕!\n本次共收录 {total:,} 个项目。") self.list_result.clear() def on_search(self, text): text = text.strip() if not self.db_path.exists(): return # 检查过滤条件 show_files = self.chk_search_file.isChecked() show_dirs = self.chk_search_dir.isChecked() if not text: self.list_result.clear() return if not show_files and not show_dirs: self.list_result.clear() self.list_result.addItem("请至少勾选一种类型(文件或文件夹)") return tokens = text.split() fts_query_parts = [] for t in tokens: # 加上 * 使得搜索 "con" 能匹配 "config" # 语法:name : "keyword*" fts_query_parts.append(f'name : "{t}*"') fts_query = " AND ".join(fts_query_parts) # 构建类型过滤 SQL type_filters = [] if show_files: type_filters.append(0) if show_dirs: type_filters.append(1) type_sql = ",".join(map(str, type_filters)) try: conn = sqlite3.connect(str(self.db_path)) cur = conn.cursor() # 核心查询语句 sql = f""" SELECT name, path, is_dir FROM files WHERE files MATCH ? AND is_dir IN ({type_sql}) ORDER BY rank LIMIT 500 """ cur.execute(sql, (fts_query,)) rows = cur.fetchall() conn.close() self.list_result.clear() if not rows: self.list_result.addItem("未找到匹配项") return self.list_result.addItem(f"--- 找到 {len(rows)} 个结果 ---") for name, path, is_dir in rows: icon = "
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |