import tkinter as tk from tkinter import ttk, filedialog, messagebox import tkinter.font as tkfont from tkinter.scrolledtext import ScrolledText import os import logging import threading # Externals (made importable by __main__.py adding their folders to sys.path) try: from tkinter_logger import TkinterLogger except Exception: TkinterLogger = None # type: ignore try: from resource_monitor import TkinterResourceMonitor, is_psutil_available except Exception: TkinterResourceMonitor = None # type: ignore is_psutil_available = lambda: False from codebridge.core.comparer import CodeComparer from codebridge.core.sync_manager import SyncManager from codebridge.gui.diff_viewer import DiffViewer from codebridge.gui.commit_dialog import CommitDialog from codebridge.gui.profile_dialog import ProfileManagerDialog, ProfileManagerFrame import json # --- Import Version Info FOR THE WRAPPER ITSELF --- try: # Use absolute import based on package name from codebridge import _version as wrapper_version WRAPPER_APP_VERSION_STRING = f"{wrapper_version.__version__} ({wrapper_version.GIT_BRANCH}/{wrapper_version.GIT_COMMIT_HASH[:7]})" WRAPPER_BUILD_INFO = f"Wrapper Built: {wrapper_version.BUILD_TIMESTAMP}" except ImportError: # This might happen if you run the wrapper directly from source # without generating its _version.py first (if you use that approach for the wrapper itself) WRAPPER_APP_VERSION_STRING = "(Dev Wrapper)" WRAPPER_BUILD_INFO = "Wrapper build time unknown" # --- End Import Version Info --- # --- Constants for Version Generation --- DEFAULT_VERSION = "0.0.0+unknown" DEFAULT_COMMIT = "Unknown" DEFAULT_BRANCH = "Unknown" # --- End Constants -- class MainWindow: """ Main GUI class for CodeBridge application. """ def __init__(self, root: tk.Tk): self.root = root self.root.title(f"CodeBridge - Codebase Synchronizer - {WRAPPER_APP_VERSION_STRING}") # Make the window taller by default so status/log area is visible self.root.geometry("1000x760") # Enforce a reasonable minimum size so status bar remains visible try: self.root.minsize(900, 600) except Exception: pass self.comparer = None self.sync_manager = SyncManager() # Initialize optional externals self.logger_sys = None if TkinterLogger is not None: try: self.logger_sys = TkinterLogger(self.root) # enable console by default; tkinter handler will be added when log window opens self.logger_sys.setup(enable_console=True, enable_tkinter=True) logging.getLogger(__name__).info("TkinterLogger initialized") except Exception: self.logger_sys = None # Resource monitor (separate var) and progress status var self._resource_monitor = None self.resource_var = tk.StringVar() self.progress_status_var = tk.StringVar() if TkinterResourceMonitor is not None and is_psutil_available(): try: self._resource_monitor = TkinterResourceMonitor(self.root, self.resource_var, poll_interval=1.0) self._resource_monitor.start() except Exception: self._resource_monitor = None else: # psutil not available -> show note on resource area self.resource_var.set("Resource monitor: psutil not installed") # Profiles: load/save path in program root try: repo_root = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..")) except Exception: repo_root = os.getcwd() self._profiles_path = os.path.join(repo_root, "profiles.json") self.profiles = self._load_profiles() self.current_ignore_exts = [] # Setup UI and close handler self._setup_ui() # Attach logger handler to persistent log widget if available try: if self.logger_sys and hasattr(self, "log_text"): try: self.logger_sys.add_tkinter_handler(self.log_text) except Exception: logging.getLogger(__name__).exception("Failed to attach tkinter log handler to main UI") except Exception: pass # Populate profiles combobox if any try: self._populate_profiles() except Exception: pass self.root.protocol("WM_DELETE_WINDOW", self._on_closing) def _setup_ui(self) -> None: """ Initializes the user interface layout. """ # Path selection frame path_frame = ttk.LabelFrame(self.root, text="Path Selection", padding=10) path_frame.pack(fill="x", padx=10, pady=5) # Profiles combobox + Manage button prof_frame = ttk.Frame(path_frame) prof_frame.grid(row=0, column=0, columnspan=3, sticky="ew", pady=(0,6)) ttk.Label(prof_frame, text="Profile:").pack(side="left") self.profile_var = tk.StringVar() self.profile_cb = ttk.Combobox(prof_frame, textvariable=self.profile_var, state="readonly", width=40) self.profile_cb.pack(side="left", padx=6) self.profile_cb.bind("<>", lambda e: self._on_profile_change()) ttk.Button(prof_frame, text="⚙️ Manage Profiles...", command=self._open_manage_profiles).pack(side="left") self.src_path_var = tk.StringVar() self.dst_path_var = tk.StringVar() self._create_path_row(path_frame, "Source (New):", self.src_path_var, 0) self._create_path_row(path_frame, "Destination (Old):", self.dst_path_var, 1) # Action buttons inside a labelled frame and filters to the right actions_row = ttk.Frame(self.root, padding=5) actions_row.pack(fill="x", padx=10) actions_frame = ttk.LabelFrame(actions_row, text="Actions", padding=6) actions_frame.pack(side="left", fill="x", expand=True) # create a prominent style for the main Compare button style = ttk.Style(self.root) try: big_font = tkfont.Font(self.root, family="TkDefaultFont", size=11, weight="bold") style.configure('Big.TButton', font=big_font, padding=(10, 6)) except Exception: style.configure('Big.TButton', padding=(8, 4)) self.compare_btn = ttk.Button(actions_frame, text="🔍 Compare Folders", command=self._run_comparison, style='Big.TButton') self.compare_btn.pack(side="left", padx=8) self.export_btn = ttk.Button(actions_frame, text="📦 Export Changes (ZIP)", command=self._export_changes) self.export_btn.pack(side="left", padx=5) self.import_btn = ttk.Button(actions_frame, text="📥 Import Package (ZIP)", command=self._import_package) self.import_btn.pack(side="left", padx=5) # Filter panel on the right: show counts and allow toggling filter_frame = ttk.Frame(actions_row, padding=(6,0)) filter_frame.pack(side="right") self.filter_vars = { "added": tk.BooleanVar(value=False), "modified": tk.BooleanVar(value=False), "deleted": tk.BooleanVar(value=False), } # buttons that show counts; initial counts are 0 self.filter_buttons = {} self.filter_buttons["added"] = ttk.Button(filter_frame, text="Added (0)", command=lambda: self._toggle_filter("added")) self.filter_buttons["added"].pack(side="left", padx=4) self.filter_buttons["modified"] = ttk.Button(filter_frame, text="Modified (0)", command=lambda: self._toggle_filter("modified")) self.filter_buttons["modified"].pack(side="left", padx=4) self.filter_buttons["deleted"] = ttk.Button(filter_frame, text="Deleted (0)", command=lambda: self._toggle_filter("deleted")) self.filter_buttons["deleted"].pack(side="left", padx=4) # store last results for filtering self.last_added = [] self.last_modified = [] self.last_deleted = [] # Results treeview self._setup_treeview() def _create_path_row(self, parent: ttk.Frame, label: str, var: tk.StringVar, row: int) -> None: """ Creates a row with a label, entry and browse button for paths. """ ttk.Label(parent, text=label).grid(row=row+1, column=0, sticky="w", padx=5) entry = ttk.Entry(parent, textvariable=var, width=80) entry.grid(row=row+1, column=1, padx=5, pady=2) btn = ttk.Button(parent, text="📁 Browse", command=lambda: self._browse_folder(var)) btn.grid(row=row+1, column=2, padx=5) # store references so they can be made readonly when a profile is selected if label.lower().startswith("source"): self.src_entry = entry self.src_browse = btn else: self.dst_entry = entry self.dst_browse = btn def _browse_folder(self, var: tk.StringVar) -> None: """ Opens a directory selection dialog. """ directory = filedialog.askdirectory() if directory: var.set(directory) def _setup_treeview(self) -> None: """ Configures the treeview to display file differences. """ tree_frame = ttk.Frame(self.root, padding=10) tree_frame.pack(fill="both", expand=True) # Progress bar for long-running operations self.progress_var = tk.IntVar(value=0) self.progress = ttk.Progressbar(tree_frame, variable=self.progress_var, maximum=100, mode="determinate") self.progress.pack(fill="x", pady=(0,6)) columns = ("status", "path") self.tree = ttk.Treeview(tree_frame, columns=columns, show="headings") self.tree.heading("status", text="Status") self.tree.heading("path", text="File Path") self.tree.column("status", width=100, stretch=False) self.tree.tag_configure("added", foreground="green") self.tree.tag_configure("modified", foreground="orange") self.tree.tag_configure("deleted", foreground="red") self.tree.bind("", self._on_item_double_click) scrollbar = ttk.Scrollbar(tree_frame, orient="vertical", command=self.tree.yview) self.tree.configure(yscroll=scrollbar.set) self.tree.pack(side="left", fill="both", expand=True) scrollbar.pack(side="right", fill="y") # Persistent log box below the treeview log_frame = ttk.Frame(self.root, padding=(5, 2)) log_frame.pack(fill="x") # Slightly reduce log height to keep status bar visible on smaller screens self.log_text = ScrolledText(log_frame, height=6, state=tk.DISABLED) self.log_text.pack(fill="both", expand=True) # Status bar at bottom: left = progress/status, right = resource monitor status_frame = ttk.Frame(self.root, padding=(5, 2)) status_frame.pack(fill="x", side="bottom") ttk.Label(status_frame, textvariable=self.progress_status_var).pack(side="left") ttk.Label(status_frame, textvariable=self.resource_var).pack(side="right") def _log(self, message: str) -> None: """Write message to both logging and the embedded ScrolledText log.""" try: import logging logging.getLogger(__name__).info(message) except Exception: pass def _update_progress(self, value: int, maximum: int, phase: str, count_display: str = None) -> None: try: self.progress.config(maximum=maximum) self.progress_var.set(value) if count_display: self.progress_status_var.set(f"{phase}: {count_display}") else: self.progress_status_var.set(f"{phase}: {value}/{maximum}") except Exception: pass def _toggle_filter(self, status: str) -> None: """Toggle a filter button and refresh the treeview.""" if status not in self.filter_vars: return cur = self.filter_vars[status].get() self.filter_vars[status].set(not cur) # update visual style of button to indicate active/inactive try: btn = self.filter_buttons.get(status) if btn: if self.filter_vars[status].get(): btn.state(["pressed"]) else: btn.state(["!pressed"]) except Exception: pass self._refresh_tree() def _refresh_tree(self) -> None: """Repopulates the treeview according to current filter selections.""" try: for item in self.tree.get_children(): self.tree.delete(item) # gather active filters active = {k for k, v in self.filter_vars.items() if v.get()} # if no filter active, show all show_all = len(active) == 0 if show_all or "added" in active: for f in self.last_added: self.tree.insert("", "end", values=("Added", f), tags=("added",)) if show_all or "modified" in active: for f in self.last_modified: self.tree.insert("", "end", values=("Modified", f), tags=("modified",)) if show_all or "deleted" in active: for f in self.last_deleted: self.tree.insert("", "end", values=("Deleted", f), tags=("deleted",)) except Exception as e: self._log(f"Error refreshing tree: {e}") def _update_filter_counts(self) -> None: """Update the text of filter buttons to reflect current counts.""" try: self.filter_buttons["added"].config(text=f"Added ({len(self.last_added)})") self.filter_buttons["modified"].config(text=f"Modified ({len(self.last_modified)})") self.filter_buttons["deleted"].config(text=f"Deleted ({len(self.last_deleted)})") except Exception: pass try: self.log_text.config(state=tk.NORMAL) self.log_text.insert("end", message + "\n") self.log_text.see("end") self.log_text.config(state=tk.DISABLED) except Exception: pass # --- Profiles persistence --- def _load_profiles(self): try: if os.path.exists(self._profiles_path): with open(self._profiles_path, "r", encoding="utf-8") as f: return json.load(f) except Exception: pass return {} def _save_profiles(self): try: with open(self._profiles_path, "w", encoding="utf-8") as f: json.dump(self.profiles, f, indent=2) except Exception: pass def _populate_profiles(self): names = ["(None)"] + sorted(self.profiles.keys()) self.profile_cb["values"] = names self.profile_cb.set(names[0]) def _open_manage_profiles(self): self._log("Opening Manage Profiles dialog") dlg = ProfileManagerDialog(self.root, self._profiles_path, self.profiles) self.root.wait_window(dlg) # reload and refresh self.profiles = self._load_profiles() self._populate_profiles() self._log("Profiles reloaded") def _close_profile_panel(self): try: if hasattr(self, "profile_panel") and self.profile_panel and self.profile_panel.winfo_exists(): self.profile_panel.destroy() except Exception: pass # reload and refresh self.profiles = self._load_profiles() self._populate_profiles() def _on_profile_change(self): sel = self.profile_var.get() if not sel or sel == "(None)": # enable editing of entries try: self.src_entry.config(state="normal") self.dst_entry.config(state="normal") self.src_browse.config(state="normal") self.dst_browse.config(state="normal") except Exception: pass self.current_ignore_exts = [] self._log("Profile selection cleared") return profile = self.profiles.get(sel) if not profile: return # set entries and make readonly self.src_path_var.set(profile.get("source", "")) self.dst_path_var.set(profile.get("destination", "")) # load ignore extensions for comparisons ign = profile.get("ignore_extensions", []) if isinstance(ign, (list, tuple)): self.current_ignore_exts = ign elif isinstance(ign, str) and ign: # comma-separated string self.current_ignore_exts = [e if e.startswith('.') else '.' + e for e in (x.strip() for x in ign.split(',')) if e] else: self.current_ignore_exts = [] try: self.src_entry.config(state="readonly") self.dst_entry.config(state="readonly") self.src_browse.config(state="disabled") self.dst_browse.config(state="disabled") except Exception: pass self._log(f"Selected profile: {sel}") def _run_comparison(self) -> None: """ Executes comparison and populates the treeview. """ src = self.src_path_var.get() dst = self.dst_path_var.get() if not src or not dst: messagebox.showwarning("Warning", "Please select both source and destination folders.") return self._log(f"Starting comparison: src={src} dst={dst}") self.comparer = CodeComparer(src, dst, ignore_extensions=self.current_ignore_exts) def _disable_ui(): try: self.compare_btn.config(state="disabled") self.export_btn.config(state="disabled") self.import_btn.config(state="disabled") except Exception: pass def _enable_ui(): try: self.compare_btn.config(state="normal") self.export_btn.config(state="normal") self.import_btn.config(state="normal") except Exception: pass def worker(): try: # disable UI self.root.after(0, _disable_ui) # phase 1: quick count try: total = self.comparer.count_files(src) except Exception: total = 0 if total <= 0: total = 1 progress_max = total * 2 # initialize progress self.root.after(0, lambda: self._update_progress(0, progress_max, "Counting", f"0/{total}")) scanned = 0 hashed = 0 def progress_handler(*cb_args): nonlocal scanned, hashed # get_relative_files calls (root_path, full_path, relative_path) if len(cb_args) == 3: root_path, full_path, relative_path = cb_args if root_path == src: scanned += 1 # update UI self.root.after(0, lambda s=scanned: self._update_progress(s, progress_max, "Scanning", f"{s}/{total}")) else: # hashing phase: comparer sends a tuple like ("hash", relative_path) arg = cb_args[0] if cb_args else None if isinstance(arg, tuple) and arg and arg[0] == "hash": hashed += 1 v = scanned + hashed self.root.after(0, lambda v=v: self._update_progress(v, progress_max, "Hashing", f"{hashed}/{total}")) # perform compare with progress handler added, modified, deleted = self.comparer.compare(progress_callback=progress_handler) # ensure progress complete self.root.after(0, lambda: self._update_progress(progress_max, progress_max, "Done", f"{total}/{total}")) # populate tree in main thread def finish_ui(): # store results and refresh according to filters self.last_added = added self.last_modified = modified self.last_deleted = deleted self._update_filter_counts() self._refresh_tree() self._log(f"Comparison complete. Added={len(added)} Modified={len(modified)} Deleted={len(deleted)}") self.progress_status_var.set("") self.progress_var.set(0) _enable_ui() self.root.after(0, finish_ui) except Exception as e: # ensure UI enabled on error try: self.root.after(0, _enable_ui) except Exception: pass self._log(f"Error during comparison: {e}") threading.Thread(target=worker, daemon=True).start() # Background comparison started; UI will be updated when finished. self._log("Background comparison started") def _on_item_double_click(self, event) -> None: """ Opens the DiffViewer when a file is double-clicked. """ selection = self.tree.selection() if not selection: return item_id = selection[0] item = self.tree.item(item_id) status, relative_path = item["values"] if status == "Deleted": messagebox.showinfo("Info", "File was deleted. Nothing to compare.") return source_full_path = os.path.join(self.src_path_var.get(), relative_path) dest_full_path = os.path.join(self.dst_path_var.get(), relative_path) DiffViewer(self.root, source_full_path, dest_full_path, relative_path) def _export_changes(self) -> None: """ Prompts for a commit message and exports changes to a ZIP. """ if not self.comparer: messagebox.showwarning("Warning", "Please run comparison first.") return dialog = CommitDialog(self.root) commit_msg = dialog.get_message() if not commit_msg: messagebox.showwarning("Cancelled", "Export cancelled: Commit message is required.") return output_zip = filedialog.asksaveasfilename(defaultextension=".zip", filetypes=[("ZIP files", "*.zip")]) if output_zip: changes = { "added": self.comparer.added, "modified": self.comparer.modified, "deleted": self.comparer.deleted } self.sync_manager.create_export_package(self.src_path_var.get(), changes, output_zip, commit_msg) messagebox.showinfo("Success", f"Package created: {output_zip}") def _import_package(self) -> None: """ Selects a ZIP package and applies it to the destination folder. """ dst = self.dst_path_var.get() if not dst: messagebox.showwarning("Warning", "Please select a destination folder first.") return package_path = filedialog.askopenfilename(filetypes=[("ZIP files", "*.zip")]) if not package_path: return confirm = messagebox.askyesno("Confirm Import", "This will overwrite/delete files in the destination. Continue?") if confirm: commit_msg = self.sync_manager.apply_package(package_path, dst) messagebox.showinfo("Import Complete", f"Package applied successfully.\n\nCommit Message:\n{commit_msg}") self._run_comparison() def _on_closing(self) -> None: """Clean shutdown: stop resource monitor and logging, then exit.""" try: if self._resource_monitor: try: self._resource_monitor.stop() except Exception: pass except Exception: pass try: if self.logger_sys: try: self.logger_sys.shutdown() except Exception: pass except Exception: pass try: self.root.destroy() except Exception: # fallback os._exit(0)