| 
精华5
阅读权限90
最后登录2025-6-21
在线时间141 小时
 累计签到:377 天连续签到:1 天
 
 域主 
 
 名望126 点 星币6580 枚 星辰15 颗 好评328 点 
 | 
 
| 
https://wwvu.lanzouq.com/icus82tvlt9c
×
注册登录后全站资源免费查看下载您需要 登录 才可以下载或查看,没有账号?立即注册  通过网盘分享的文件:电脑自动备份插入U盘数据.exe
 链接: https://pan.baidu.com/s/1fI_QTf3F6DKw7cFzMJr2Hw?pwd=52pj 提取码: 52pj
 
 
 复制代码import os
import shutil
import time
import string
import win32file # 需要安装 pywin32
import logging
from datetime import datetime
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
import tkinter as tk
from tkinter import scrolledtext
import queue
 
# --- 配置 ---
# 备份文件存储的基础路径 (请确保这个文件夹存在,或者脚本有权限创建它)
BACKUP_DESTINATION_BASE = r"D:\USB_Backups"
# 检测时间间隔(秒)
CHECK_INTERVAL = 5
# 日志文件路径
LOG_FILE = os.path.join(BACKUP_DESTINATION_BASE, "usb_backup_log.txt")
# --- 配置结束 ---
 
# --- GUI 相关 ---
class TextHandler(logging.Handler):
    """自定义日志处理器,将日志记录发送到 Text 控件"""
    def __init__(self, text_widget):
        logging.Handler.__init__(self)
        self.text_widget = text_widget
        self.queue = queue.Queue()
        # 启动一个线程来处理队列中的日志消息,避免阻塞主线程
        self.thread = threading.Thread(target=self.process_queue, daemon=True)
        self.thread.start()
 
    def emit(self, record):
        msg = self.format(record)
        self.queue.put(msg)
 
    def process_queue(self):
        while True:
            try:
                msg = self.queue.get()
                if msg is None: # Sentinel value to stop the thread
                    break
                # Schedule GUI update on the main thread
                def update_widget():
                    try:
                        self.text_widget.configure(state='normal')
                        self.text_widget.insert(tk.END, msg + '\n')
                        self.text_widget.configure(state='disabled')
                        self.text_widget.yview(tk.END)
                    except tk.TclError: # Handle cases where the widget might be destroyed
                        pass
                self.text_widget.after(0, update_widget)
                self.queue.task_done()
            except Exception:
                # 处理可能的窗口已销毁等异常
                import traceback
                traceback.print_exc()
                break
 
    def close(self):
        self.stop_processing() # Signal the thread to stop
        # Don't join here to avoid blocking the main thread
        logging.Handler.close(self)
 
    def stop_processing(self):
        """Signals the processing thread to stop without waiting for it."""
        self.queue.put(None) # Send sentinel to stop the processing thread
 
class App(tk.Tk):
    def __init__(self):
        super().__init__()
        self.title("USB 自动备份")
        self.geometry("600x400")
 
        self.log_text = scrolledtext.ScrolledText(self, state='disabled', wrap=tk.WORD)
        self.log_text.pack(expand=True, fill='both', padx=10, pady=5)
 
        self.status_label = tk.Label(self, text="状态: 初始化中...", anchor='w')
        self.status_label.pack(fill='x', padx=10, pady=2)
 
        self.exit_button = tk.Button(self, text="退出", command=self.quit_app)
        self.exit_button.pack(pady=5)
 
        self.backup_thread = None
        self.running = True
        self.protocol("WM_DELETE_WINDOW", self.quit_app)
 
        self.configure_logging()
 
    def configure_logging(self):
        # 日志配置前先确保备份目录存在
        if not os.path.exists(BACKUP_DESTINATION_BASE):
            try:
                os.makedirs(BACKUP_DESTINATION_BASE)
            except Exception as e:
                # 如果无法创建目录,在GUI中显示错误
                self.update_status(f"错误: 无法创建备份目录 {BACKUP_DESTINATION_BASE}: {e}")
                self.log_text.configure(state='normal')
                self.log_text.insert(tk.END, f"错误: 无法创建备份目录 {BACKUP_DESTINATION_BASE}: {e}\n")
                self.log_text.configure(state='disabled')
                return
 
        log_formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
 
        # 文件处理器
        file_handler = logging.FileHandler(LOG_FILE)
        file_handler.setFormatter(log_formatter)
 
        # GUI 文本框处理器
        self.text_handler = TextHandler(self.log_text)
        self.text_handler.setFormatter(log_formatter)
 
        # 获取根 logger 并添加处理器
        root_logger = logging.getLogger()
        root_logger.setLevel(logging.INFO)
        # 清除可能存在的默认处理器(例如 basicConfig 创建的 StreamHandler)
        if root_logger.hasHandlers():
            root_logger.handlers.clear()
        root_logger.addHandler(file_handler)
        root_logger.addHandler(self.text_handler)
        # 添加一个 StreamHandler 以便在控制台也看到日志(调试用)
        # stream_handler = logging.StreamHandler()
        # stream_handler.setFormatter(log_formatter)
        # root_logger.addHandler(stream_handler)
 
    def update_status(self, message):
        # 使用 self.after 将 GUI 更新调度回主线程
        self.after(0, lambda: self.status_label.config(text=f"状态: {message}"))
 
    def start_backup_monitor(self):
        self.backup_thread = threading.Thread(target=run_backup_monitor, args=(self,), daemon=True)
        self.backup_thread.start()
 
    def quit_app(self):
        logging.info("收到退出信号,程序即将关闭。")
        self.running = False # Signal the backup thread to stop
 
        # Signal the logger thread to stop processing new messages
        if hasattr(self, 'text_handler'):
            self.text_handler.stop_processing()
 
        # Give the backup thread a short time to finish
        if self.backup_thread and self.backup_thread.is_alive():
            try:
                self.backup_thread.join(timeout=1.0) # Wait max 1 second
                if self.backup_thread.is_alive():
                    logging.warning("备份线程未能在1秒内停止,将强制关闭窗口。")
            except Exception as e:
                logging.error(f"等待备份线程时出错: {e}")
 
        # Close the main window. Daemon threads will be terminated.
        self.destroy()
        # os._exit(0) # Avoid force exit, let the application close naturally
 
# --- 核心备份逻辑 (从旧 main 函数提取) ---
def get_available_drives():
    """获取当前所有可用的驱动器盘符"""
    drives = []
    bitmask = win32file.GetLogicalDrives()
    for letter in string.ascii_uppercase:
        if bitmask & 1:
            drives.append(letter)
        bitmask >>= 1
    return set(drives)
 
def is_removable_drive(drive_letter):
    """判断指定盘符是否是可移动驱动器 (U盘通常是这个类型)"""
    drive_path = f"{drive_letter}:\"
    try:
        # DRIVE_REMOVABLE 的类型代码是 2
        return win32file.GetDriveTypeW(drive_path) == win32file.DRIVE_REMOVABLE
    except Exception as e:
        # logging.error(f"检查驱动器 {drive_path} 类型时出错: {e}") # 可能在驱动器刚插入时发生
        return False
 
def should_skip_file(src, dst):
    """判断是否需要跳过备份(增量备份逻辑)"""
    if not os.path.exists(dst):
        return False
    try:
        src_stat = os.stat(src)
        dst_stat = os.stat(dst)
        # 文件大小和修改时间都相同则跳过
        return src_stat.st_size == dst_stat.st_size and int(src_stat.st_mtime) == int(dst_stat.st_mtime)
    except Exception:
        return False
 
def copy_file_with_log(src, dst):
    try:
        file_size = os.path.getsize(src)
        # 超过128MB的大文件采用分块复制
        if file_size > 128 * 1024 * 1024:
            chunk_size = 16 * 1024 * 1024  # 16MB
            with open(src, 'rb') as fsrc, open(dst, 'wb') as fdst:
                while True:
                    chunk = fsrc.read(chunk_size)
                    if not chunk:
                        break
                    fdst.write(chunk)
            # 尝试复制亓数据,如果失败则记录但不中断
            try:
                shutil.copystat(src, dst)
            except Exception as e_stat:
                logging.warning(f"无法复制亓数据 {src} -> {dst}: {e_stat}")
            logging.info(f"分块复制大文件: {src} -> {dst}")
        else:
            shutil.copy2(src, dst)
            logging.info(f"已复制: {src} -> {dst}")
    except Exception as e:
        logging.error(f"复制文件 {src} 时出错: {e}")
 
def threaded_copytree(src, dst, skip_exts=None, skip_dirs=None, max_workers=8):
    """线程池递归复制目录,支持增量备份和跳过指定类型,限制最大线程数"""
    if skip_exts is None:
        skip_exts = ['.tmp', '.log', '.sys']
    if skip_dirs is None:
        skip_dirs = ['$RECYCLE.BIN', 'System Volume Information']
    if not os.path.exists(dst):
        try:
            os.makedirs(dst)
        except Exception as e_mkdir:
            logging.error(f"创建目录 {dst} 失败: {e_mkdir}")
            return #无法创建目标目录,则无法继续复制
    tasks = []
    small_files = []
    try:
        with ThreadPoolExecutor(max_workers=max_workers) as executor:
            for item in os.listdir(src):
                s = os.path.join(src, item)
                d = os.path.join(dst, item)
                try:
                    if os.path.isdir(s):
                        if item in skip_dirs:
                            logging.info(f"跳过目录: {s}")
                            continue
                        # 递归调用也放入线程池
                        tasks.append(executor.submit(threaded_copytree, s, d, skip_exts, skip_dirs, max_workers))
                    else:
                        ext = os.path.splitext(item)[1].lower()
                        if ext in skip_exts:
                            logging.info(f"跳过文件: {s}")
                            continue
                        if should_skip_file(s, d):
                            # logging.debug(f"跳过未变更文件: {s}") # 改为 debug 级别
                            continue
                        # 小于16MB的小文件批量处理
                        if os.path.getsize(s) < 16 * 1024 * 1024:
                            small_files.append((s, d))
                        else:
                            tasks.append(executor.submit(copy_file_with_log, s, d))
                except PermissionError:
                    logging.warning(f"无权限访问: {s},跳过")
                except FileNotFoundError:
                    logging.warning(f"文件或目录不存在(可能在扫描时被移除): {s},跳过")
                except Exception as e_item:
                    logging.error(f"处理 {s} 时出错: {e_item}")
 
            # 批量提交小文件任务,减少线程调度开销
            batch_size = 16
            for i in range(0, len(small_files), batch_size):
                batch = small_files[i:i+batch_size]
                tasks.append(executor.submit(batch_copy_files, batch))
 
            # 等待所有任务完成
            for future in as_completed(tasks):
                try:
                    future.result() # 获取结果以暴露异常
                except Exception as e_future:
                    logging.error(f"线程池任务出错: {e_future}")
    except PermissionError:
        logging.error(f"无权限访问源目录: {src}")
    except FileNotFoundError:
        logging.error(f"源目录不存在: {src}")
    except Exception as e_pool:
        logging.error(f"处理目录 {src} 时线程池出错: {e_pool}")
 
def batch_copy_files(file_pairs):
    for src, dst in file_pairs:
        copy_file_with_log(src, dst)
 
def backup_usb_drive(drive_letter, app_instance):
    """执行U盘备份(多线程+增量),并更新GUI状态"""
    source_drive = f"{drive_letter}:\"
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    destination_folder = os.path.join(BACKUP_DESTINATION_BASE, f"Backup_{drive_letter}_{timestamp}")
 
    logging.info(f"检测到U盘: {source_drive}")
    app_instance.update_status(f"检测到U盘: {drive_letter}:\\,准备备份...")
    logging.info(f"开始备份到: {destination_folder}")
    app_instance.update_status(f"开始备份 {drive_letter}:\\ 到 {destination_folder}")
 
    start_time = time.time()
    try:
        threaded_copytree(source_drive, destination_folder, max_workers=16)
        end_time = time.time()
        duration = end_time - start_time
        logging.info(f"成功完成备份: {source_drive} -> {destination_folder} (耗时: {duration:.2f} 秒)")
        app_instance.update_status(f"备份完成: {drive_letter}:\\ (耗时: {duration:.2f} 秒)")
    except FileNotFoundError:
        logging.error(f"错误:源驱动器 {source_drive} 不存在或无法访问。")
        app_instance.update_status(f"错误: 无法访问 {drive_letter}:\")
    except PermissionError:
        logging.error(f"错误:没有权限读取 {source_drive} 或写入 {destination_folder}。请检查权限设置。")
        app_instance.update_status(f"错误: 权限不足 {drive_letter}:\\ 或目标文件夹")
    except Exception as e:
        logging.error(f"备份U盘 {source_drive} 时发生未知错误: {e}")
        app_instance.update_status(f"错误: 备份 {drive_letter}:\\ 时发生未知错误")
    finally:
        # 短暂显示完成/错误状态后,恢复到空闲状态
        app_instance.after(5000, lambda: app_instance.update_status("空闲,等待U盘插入..."))
 
def run_backup_monitor(app_instance):
    """后台监控线程的主函数"""
    logging.info("U盘自动备份程序启动...")
    logging.info(f"备份将存储在: {BACKUP_DESTINATION_BASE}")
    app_instance.update_status("启动成功,等待U盘插入...")
 
    # 检查备份目录是否已成功创建(在 App 初始化时完成)
    if not os.path.exists(BACKUP_DESTINATION_BASE):
        logging.error(f"无法启动监控:备份目录 {BACKUP_DESTINATION_BASE} 不存在且无法创建。")
        app_instance.update_status(f"错误: 备份目录不存在且无法创建")
        return
 
    try:
        known_drives = get_available_drives()
        logging.info(f"当前已知驱动器: {sorted(list(known_drives))}")
    except Exception as e_init_drives:
        logging.error(f"初始化获取驱动器列表失败: {e_init_drives}")
        app_instance.update_status(f"错误: 获取驱动器列表失败")
        known_drives = set()
 
    while app_instance.running:
        try:
            app_instance.update_status("正在检测驱动器...")
            current_drives = get_available_drives()
            new_drives = current_drives - known_drives
            removed_drives = known_drives - current_drives
 
            if new_drives:
                logging.info(f"检测到新驱动器: {sorted(list(new_drives))}")
                for drive in new_drives:
                    if not app_instance.running: break # Check flag before potentially long operation
                    # 稍作等待,确保驱动器已准备好
                    logging.info(f"等待驱动器 {drive}: 准备就绪...")
                    app_instance.update_status(f"检测到新驱动器 {drive}:,等待准备就绪...")
                    time.sleep(3) # 增加等待时间
                    if not app_instance.running: break
                    try:
                        if is_removable_drive(drive):
                            backup_usb_drive(drive, app_instance)
                        else:
                            logging.info(f"驱动器 {drive}: 不是可移动驱动器,跳过备份。")
                            app_instance.update_status(f"驱动器 {drive}: 非U盘,跳过")
                            # 短暂显示后恢复空闲
                            app_instance.after(3000, lambda: app_instance.update_status("空闲,等待U盘插入...") if app_instance.running else None)
                    except Exception as e_check:
                         logging.error(f"检查或备份驱动器 {drive}: 时出错: {e_check}")
                         app_instance.update_status(f"错误: 处理驱动器 {drive}: 时出错")
                         app_instance.after(5000, lambda: app_instance.update_status("空闲,等待U盘插入...") if app_instance.running else None)
 
            if removed_drives:
                logging.info(f"检测到驱动器移除: {sorted(list(removed_drives))}")
                # Optionally update status for removed drives
                # app_instance.update_status(f"驱动器 {','.join(sorted(list(removed_drives)))} 已移除")
                # app_instance.after(3000, lambda: app_instance.update_status("空闲,等待U盘插入...") if app_instance.running else None)
 
            # 更新已知驱动器列表
            known_drives = current_drives
 
            # 在循环末尾更新状态为空闲(如果没有正在进行的草作)
            if not new_drives and app_instance.status_label.cget("text").startswith("状态: 正在检测驱动器"):
                 app_instance.update_status("空闲,等待U盘插入...")
 
            # 等待指定间隔,并允许提前退出
            interval_counter = 0
            while app_instance.running and interval_counter < CHECK_INTERVAL:
                time.sleep(1)
                interval_counter += 1
            if not app_instance.running:
                break
 
        except Exception as e:
            logging.error(f"主循环发生错误: {e}")
            app_instance.update_status(f"错误: {e}")
            # 防止因临时错误导致程序崩溃,稍等后继续,并允许提前退出
            error_wait_counter = 0
            while app_instance.running and error_wait_counter < CHECK_INTERVAL * 2:
                 time.sleep(1)
                 error_wait_counter += 1
            if not app_instance.running:
                break
 
    logging.info("后台监控线程已停止。")
    app_instance.update_status("程序已停止")
 
if __name__ == "__main__":
    app = App()
    app.start_backup_monitor()
    app.mainloop()
 
 | 
 |