diff --git a/keepercommander/__init__.py b/keepercommander/__init__.py index 73bac478d..e5ba75178 100644 --- a/keepercommander/__init__.py +++ b/keepercommander/__init__.py @@ -10,4 +10,4 @@ # Contact: ops@keepersecurity.com # -__version__ = '17.2.5' +__version__ = '17.2.6' diff --git a/keepercommander/api.py b/keepercommander/api.py index 73a8bd9b6..af0e1b3e9 100644 --- a/keepercommander/api.py +++ b/keepercommander/api.py @@ -486,12 +486,28 @@ def search_records(params, searchstring): return search_results -def search_shared_folders(params, searchstring): - """Search shared folders """ +def search_shared_folders(params, searchstring, use_regex=False): + """Search shared folders. - p = re.compile(searchstring.lower()) + Args: + params: KeeperParams + searchstring: Search string (tokens or regex depending on use_regex) + use_regex: If True, treat as regex. If False (default), token-based search. + """ + search_results = [] - search_results = [] + if not searchstring: + return search_results + + if use_regex: + p = re.compile(searchstring.lower()) + match_func = lambda target: p.search(target) + else: + # Token-based search: all tokens must match + tokens = [t.lower() for t in searchstring.split() if t.strip()] + if not tokens: + return search_results + match_func = lambda target: all(token in target for token in tokens) for shared_folder_uid in params.shared_folder_cache: @@ -499,29 +515,45 @@ def search_shared_folders(params, searchstring): sf = get_shared_folder(params, shared_folder_uid) target = sf.to_lowerstring() - if p.search(target): + if match_func(target): logging.debug('Search success') search_results.append(sf) - + return search_results -def search_teams(params, searchstring): - """Search teams """ +def search_teams(params, searchstring, use_regex=False): + """Search teams. - p = re.compile(searchstring.lower()) + Args: + params: KeeperParams + searchstring: Search string (tokens or regex depending on use_regex) + use_regex: If True, treat as regex. If False (default), token-based search. + """ + search_results = [] - search_results = [] + if not searchstring: + return search_results + + if use_regex: + p = re.compile(searchstring.lower()) + match_func = lambda target: p.search(target) + else: + # Token-based search: all tokens must match + tokens = [t.lower() for t in searchstring.split() if t.strip()] + if not tokens: + return search_results + match_func = lambda target: all(token in target for token in tokens) for team_uid in params.team_cache: team = get_team(params, team_uid) target = team.to_lowerstring() - if p.search(target): + if match_func(target): logging.debug('Search success') search_results.append(team) - + return search_results diff --git a/keepercommander/auth/console_ui.py b/keepercommander/auth/console_ui.py index e930cc829..0145d10b1 100644 --- a/keepercommander/auth/console_ui.py +++ b/keepercommander/auth/console_ui.py @@ -23,38 +23,38 @@ def __init__(self): def on_device_approval(self, step): if self._show_device_approval_help: - print(f"\n{Fore.YELLOW}Device Approval Required{Fore.RESET}\n") - print(f"{Fore.CYAN}Select an approval method:{Fore.RESET}") - print(f" {Fore.GREEN}1{Fore.RESET}. Email - Send approval link to your email") - print(f" {Fore.GREEN}2{Fore.RESET}. Keeper Push - Send notification to an approved device") - print(f" {Fore.GREEN}3{Fore.RESET}. 2FA Push - Send code via your 2FA method") - print() - print(f" {Fore.GREEN}c{Fore.RESET}. Enter code - Enter a verification code") - print(f" {Fore.GREEN}q{Fore.RESET}. Cancel login") - print() + logging.info(f"\n{Fore.YELLOW}Device Approval Required{Fore.RESET}\n") + logging.info(f"{Fore.CYAN}Select an approval method:{Fore.RESET}") + logging.info(f" {Fore.GREEN}1{Fore.RESET}. Email - Send approval link to your email") + logging.info(f" {Fore.GREEN}2{Fore.RESET}. Keeper Push - Send notification to an approved device") + logging.info(f" {Fore.GREEN}3{Fore.RESET}. 2FA Push - Send code via your 2FA method") + logging.info("") + logging.info(f" {Fore.GREEN}c{Fore.RESET}. Enter code - Enter a verification code") + logging.info(f" {Fore.GREEN}q{Fore.RESET}. Cancel login") + logging.info("") self._show_device_approval_help = False else: - print(f"\n{Fore.YELLOW}Waiting for device approval.{Fore.RESET}") - print(f"{Fore.CYAN}Check email, SMS, or push notification on the approved device.{Fore.RESET}") - print(f"Enter {Fore.GREEN}c {Fore.RESET} to submit a verification code.\n") + logging.info(f"\n{Fore.YELLOW}Waiting for device approval.{Fore.RESET}") + logging.info(f"{Fore.CYAN}Check email, SMS, or push notification on the approved device.{Fore.RESET}") + logging.info(f"Enter {Fore.GREEN}c {Fore.RESET} to submit a verification code.\n") try: selection = input(f'{Fore.GREEN}Selection{Fore.RESET} (or Enter to check status): ').strip().lower() if selection == '1' or selection == 'email_send' or selection == 'es': step.send_push(login_steps.DeviceApprovalChannel.Email) - print(f"\n{Fore.GREEN}Email sent to {step.username}{Fore.RESET}") - print("Click the approval link in the email, then press Enter.\n") + logging.info(f"\n{Fore.GREEN}Email sent to {step.username}{Fore.RESET}") + logging.info("Click the approval link in the email, then press Enter.\n") elif selection == '2' or selection == 'keeper_push' or selection == 'kp': step.send_push(login_steps.DeviceApprovalChannel.KeeperPush) - print(f"\n{Fore.GREEN}Push notification sent.{Fore.RESET}") - print("Approve on your device, then press Enter.\n") + logging.info(f"\n{Fore.GREEN}Push notification sent.{Fore.RESET}") + logging.info("Approve on your device, then press Enter.\n") elif selection == '3' or selection == '2fa_send' or selection == '2fs': step.send_push(login_steps.DeviceApprovalChannel.TwoFactor) - print(f"\n{Fore.GREEN}2FA code sent.{Fore.RESET}") - print("Enter the code using option 'c'.\n") + logging.info(f"\n{Fore.GREEN}2FA code sent.{Fore.RESET}") + logging.info("Enter the code using option 'c'.\n") elif selection == 'c' or selection.startswith('c '): # Support both "c" (prompts for code) and "c " (code inline) @@ -67,23 +67,23 @@ def on_device_approval(self, step): # Try email code first, then 2FA try: step.send_code(login_steps.DeviceApprovalChannel.Email, code_input) - print(f"{Fore.GREEN}Successfully verified email code.{Fore.RESET}") + logging.info(f"{Fore.GREEN}Successfully verified email code.{Fore.RESET}") except KeeperApiError: try: step.send_code(login_steps.DeviceApprovalChannel.TwoFactor, code_input) - print(f"{Fore.GREEN}Successfully verified 2FA code.{Fore.RESET}") + logging.info(f"{Fore.GREEN}Successfully verified 2FA code.{Fore.RESET}") except KeeperApiError as e: - print(f"{Fore.YELLOW}Invalid code. Please try again.{Fore.RESET}") + logging.warning(f"{Fore.YELLOW}Invalid code. Please try again.{Fore.RESET}") elif selection.startswith("email_code="): code = selection.replace("email_code=", "") step.send_code(login_steps.DeviceApprovalChannel.Email, code) - print(f"{Fore.GREEN}Successfully verified email code.{Fore.RESET}") + logging.info(f"{Fore.GREEN}Successfully verified email code.{Fore.RESET}") elif selection.startswith("2fa_code="): code = selection.replace("2fa_code=", "") step.send_code(login_steps.DeviceApprovalChannel.TwoFactor, code) - print(f"{Fore.GREEN}Successfully verified 2FA code.{Fore.RESET}") + logging.info(f"{Fore.GREEN}Successfully verified 2FA code.{Fore.RESET}") elif selection == 'q': step.cancel() @@ -92,14 +92,12 @@ def on_device_approval(self, step): step.resume() else: - print(f"{Fore.YELLOW}Invalid selection. Enter 1, 2, 3, c, q, or press Enter.{Fore.RESET}") + logging.warning(f"{Fore.YELLOW}Invalid selection. Enter 1, 2, 3, c, q, or press Enter.{Fore.RESET}") except KeyboardInterrupt: step.cancel() except KeeperApiError as kae: - print() - print(f'{Fore.YELLOW}{kae.message}{Fore.RESET}') - pass + logging.warning(f'{Fore.YELLOW}{kae.message}{Fore.RESET}') @staticmethod def two_factor_channel_to_desc(channel): # type: (login_steps.TwoFactorChannel) -> str @@ -122,13 +120,13 @@ def on_two_factor(self, step): channels = step.get_channels() if self._show_two_factor_help: - print(f"\n{Fore.YELLOW}Two-Factor Authentication Required{Fore.RESET}\n") - print(f"{Fore.CYAN}Select your 2FA method:{Fore.RESET}") + logging.info(f"\n{Fore.YELLOW}Two-Factor Authentication Required{Fore.RESET}\n") + logging.info(f"{Fore.CYAN}Select your 2FA method:{Fore.RESET}") for i in range(len(channels)): channel = channels[i] - print(f" {Fore.GREEN}{i+1}{Fore.RESET}. {ConsoleLoginUi.two_factor_channel_to_desc(channel.channel_type)} {channel.channel_name} {channel.phone}") - print(f" {Fore.GREEN}q{Fore.RESET}. Cancel login") - print() + logging.info(f" {Fore.GREEN}{i+1}{Fore.RESET}. {ConsoleLoginUi.two_factor_channel_to_desc(channel.channel_type)} {channel.channel_name} {channel.phone}") + logging.info(f" {Fore.GREEN}q{Fore.RESET}. Cancel login") + logging.info("") self._show_device_approval_help = False channel = None # type: Optional[login_steps.TwoFactorChannelInfo] @@ -143,9 +141,9 @@ def on_two_factor(self, step): channel = channels[idx-1] logging.debug(f"Selected {idx}. {ConsoleLoginUi.two_factor_channel_to_desc(channel.channel_type)}") else: - print("Invalid entry, additional factors of authentication shown may be configured if not currently enabled.") + logging.warning("Invalid entry, additional factors of authentication shown may be configured if not currently enabled.") else: - print("Invalid entry, additional factors of authentication shown may be configured if not currently enabled.") + logging.warning("Invalid entry, additional factors of authentication shown may be configured if not currently enabled.") mfa_prompt = False @@ -155,9 +153,9 @@ def on_two_factor(self, step): mfa_prompt = True try: step.send_push(channel.channel_uid, login_steps.TwoFactorPushAction.TextMessage) - print(f'\n{Fore.GREEN}SMS sent successfully.{Fore.RESET}\n') + logging.info(f'\n{Fore.GREEN}SMS sent successfully.{Fore.RESET}\n') except KeeperApiError: - print("Was unable to send SMS.") + logging.warning("Was unable to send SMS.") elif channel.channel_type == login_steps.TwoFactorChannel.SecurityKey: try: from ..yubikey.yubikey import yubikey_authenticate @@ -178,14 +176,14 @@ def on_two_factor(self, step): } step.duration = login_steps.TwoFactorDuration.EveryLogin step.send_code(channel.channel_uid, json.dumps(signature)) - print(f'{Fore.GREEN}Security key verified.{Fore.RESET}') + logging.info(f'{Fore.GREEN}Security key verified.{Fore.RESET}') except ImportError as e: from ..yubikey import display_fido2_warning display_fido2_warning() logging.warning(e) except KeeperApiError: - print(f'{Fore.RED}Unable to verify security key.{Fore.RESET}') + logging.error(f'{Fore.RED}Unable to verify security key.{Fore.RESET}') except Exception as e: logging.error(e) @@ -228,7 +226,7 @@ def on_two_factor(self, step): 'Ask Every 24 hours' if mfa_expiration == login_steps.TwoFactorDuration.Every24Hours else 'Ask Every 30 days', "|".join(allowed_expirations)) - print(prompt_exp) + logging.info(prompt_exp) try: answer = input(f'\n{Fore.GREEN}Enter 2FA Code: {Fore.RESET}') @@ -240,7 +238,7 @@ def on_two_factor(self, step): if m_duration: answer = m_duration.group(1).strip().lower() if answer not in allowed_expirations: - print(f'Invalid 2FA Duration: {answer}') + logging.warning(f'Invalid 2FA Duration: {answer}') answer = '' if answer == 'login': @@ -264,16 +262,16 @@ def on_two_factor(self, step): step.duration = mfa_expiration try: step.send_code(channel.channel_uid, otp_code) - print(f'{Fore.GREEN}2FA code verified.{Fore.RESET}') + logging.info(f'{Fore.GREEN}2FA code verified.{Fore.RESET}') except KeeperApiError: - print(f'{Fore.YELLOW}Invalid 2FA code. Please try again.{Fore.RESET}') + logging.warning(f'{Fore.YELLOW}Invalid 2FA code. Please try again.{Fore.RESET}') def on_password(self, step): if self._show_password_help: logging.info(f'{Fore.CYAN}Enter master password for {Fore.WHITE}{step.username}{Fore.RESET}') if self._failed_password_attempt > 0: - print(f'{Fore.YELLOW}Forgot password? Type "recover"{Fore.RESET}') + logging.info(f'{Fore.YELLOW}Forgot password? Type "recover"{Fore.RESET}') password = getpass.getpass(prompt=f'{Fore.GREEN}Password: {Fore.RESET}', stream=None) if not password: @@ -284,7 +282,7 @@ def on_password(self, step): try: step.verify_password(password) except KeeperApiError as kae: - print(kae.message) + logging.warning(kae.message) except KeyboardInterrupt: step.cancel() @@ -300,19 +298,19 @@ def on_sso_redirect(self, step): wb = None sp_url = step.sso_login_url - print(f'\n{Fore.CYAN}SSO Login URL:{Fore.RESET}\n{sp_url}\n') + logging.info(f'\n{Fore.CYAN}SSO Login URL:{Fore.RESET}\n{sp_url}\n') if self._show_sso_redirect_help: - print(f'{Fore.CYAN}Navigate to SSO Login URL with your browser and complete login.{Fore.RESET}') - print(f'{Fore.CYAN}Copy the returned SSO Token and paste it here.{Fore.RESET}') - print(f'{Fore.YELLOW}TIP: Click "Copy login token" button on the SSO Connect page.{Fore.RESET}') - print('') - print(f' {Fore.GREEN}a{Fore.RESET}. SSO User with a Master Password') - print(f' {Fore.GREEN}c{Fore.RESET}. Copy SSO Login URL to clipboard') + logging.info(f'{Fore.CYAN}Navigate to SSO Login URL with your browser and complete login.{Fore.RESET}') + logging.info(f'{Fore.CYAN}Copy the returned SSO Token and paste it here.{Fore.RESET}') + logging.info(f'{Fore.YELLOW}TIP: Click "Copy login token" button on the SSO Connect page.{Fore.RESET}') + logging.info('') + logging.info(f' {Fore.GREEN}a{Fore.RESET}. SSO User with a Master Password') + logging.info(f' {Fore.GREEN}c{Fore.RESET}. Copy SSO Login URL to clipboard') if wb: - print(f' {Fore.GREEN}o{Fore.RESET}. Open SSO Login URL in web browser') - print(f' {Fore.GREEN}p{Fore.RESET}. Paste SSO Token from clipboard') - print(f' {Fore.GREEN}q{Fore.RESET}. Cancel SSO login') - print() + logging.info(f' {Fore.GREEN}o{Fore.RESET}. Open SSO Login URL in web browser') + logging.info(f' {Fore.GREEN}p{Fore.RESET}. Paste SSO Token from clipboard') + logging.info(f' {Fore.GREEN}q{Fore.RESET}. Cancel SSO login') + logging.info('') self._show_sso_redirect_help = False while True: @@ -331,25 +329,25 @@ def on_sso_redirect(self, step): token = None try: pyperclip.copy(sp_url) - print('SSO Login URL is copied to clipboard.') + logging.info('SSO Login URL is copied to clipboard.') except: - print('Failed to copy SSO Login URL to clipboard.') + logging.warning('Failed to copy SSO Login URL to clipboard.') elif token == 'o': token = None if wb: try: wb.open_new_tab(sp_url) except: - print('Failed to open web browser.') + logging.warning('Failed to open web browser.') elif token == 'p': try: token = pyperclip.paste() except: token = '' - logging.info('Failed to paste from clipboard') + logging.warning('Failed to paste from clipboard') else: if len(token) < 10: - print(f'Unsupported menu option: {token}') + logging.warning(f'Unsupported menu option: {token}') token = None if token: step.set_sso_token(token) @@ -357,14 +355,14 @@ def on_sso_redirect(self, step): def on_sso_data_key(self, step): if self._show_sso_data_key_help: - print(f'\n{Fore.YELLOW}Device Approval Required for SSO{Fore.RESET}\n') - print(f'{Fore.CYAN}Select an approval method:{Fore.RESET}') - print(f' {Fore.GREEN}1{Fore.RESET}. Keeper Push - Send a push notification to your device') - print(f' {Fore.GREEN}2{Fore.RESET}. Admin Approval - Request your admin to approve this device') - print('') - print(f' {Fore.GREEN}r{Fore.RESET}. Resume SSO login after device is approved') - print(f' {Fore.GREEN}q{Fore.RESET}. Cancel SSO login') - print() + logging.info(f'\n{Fore.YELLOW}Device Approval Required for SSO{Fore.RESET}\n') + logging.info(f'{Fore.CYAN}Select an approval method:{Fore.RESET}') + logging.info(f' {Fore.GREEN}1{Fore.RESET}. Keeper Push - Send a push notification to your device') + logging.info(f' {Fore.GREEN}2{Fore.RESET}. Admin Approval - Request your admin to approve this device') + logging.info('') + logging.info(f' {Fore.GREEN}r{Fore.RESET}. Resume SSO login after device is approved') + logging.info(f' {Fore.GREEN}q{Fore.RESET}. Cancel SSO login') + logging.info('') self._show_sso_data_key_help = False while True: @@ -384,4 +382,4 @@ def on_sso_data_key(self, step): elif answer == '2': step.request_data_key(login_steps.DataKeyShareChannel.AdminApproval) else: - print(f'Action \"{answer}\" is not supported.') + logging.warning(f'Action \"{answer}\" is not supported.') diff --git a/keepercommander/commands/_supershell_impl.py b/keepercommander/commands/_supershell_impl.py index 8c0fc8e92..8ddaef6ba 100644 --- a/keepercommander/commands/_supershell_impl.py +++ b/keepercommander/commands/_supershell_impl.py @@ -17,8 +17,25 @@ from pathlib import Path from typing import Optional, List, Dict, Any import pyperclip +from pyperclip import PyperclipException from rich.markup import escape as rich_escape + +def safe_copy_to_clipboard(text: str) -> tuple[bool, str]: + """Safely copy text to clipboard, handling missing clipboard on remote/headless systems. + + Returns: + (True, "") on success + (False, error_message) on failure + """ + try: + pyperclip.copy(text) + return True, "" + except PyperclipException: + return False, "Clipboard not available (no X11/Wayland)" + except Exception as e: + return False, str(e) + # Import from refactored modules from .supershell.themes import COLOR_THEMES from .supershell.utils import load_preferences, save_preferences, strip_ansi_codes @@ -39,7 +56,7 @@ from textual.app import App, ComposeResult from textual.containers import Container, Horizontal, Vertical, VerticalScroll, Center, Middle -from textual.widgets import Tree, DataTable, Footer, Header, Static, Input, Label, Button +from textual.widgets import Tree, DataTable, Footer, Header, Static, Input, Label, Button, TextArea from textual.binding import Binding from textual.screen import Screen, ModalScreen from textual.reactive import reactive @@ -47,7 +64,334 @@ from textual.message import Message from textual.timer import Timer from rich.text import Text -from textual.events import Click, MouseDown, Paste +from textual.events import Click, MouseDown, MouseUp, MouseMove, Paste + +# === DEBUG EVENT LOGGING === +# Set to True to log all mouse/keyboard events to /tmp/supershell_debug.log +# tail -f /tmp/supershell_debug.log to watch events in real-time +DEBUG_EVENTS = False +_debug_log_file = None + +def _debug_log(msg: str): + """Log debug message to /tmp/supershell_debug.log if DEBUG_EVENTS is True.""" + if not DEBUG_EVENTS: + return + global _debug_log_file + try: + if _debug_log_file is None: + _debug_log_file = open('/tmp/supershell_debug.log', 'a') + import datetime + timestamp = datetime.datetime.now().strftime('%H:%M:%S.%f')[:-3] + _debug_log_file.write(f"[{timestamp}] {msg}\n") + _debug_log_file.flush() + except Exception as e: + pass # Silently fail if logging fails +# === END DEBUG EVENT LOGGING === + + +class AutoCopyTextArea(TextArea): + """TextArea that auto-copies selected text to clipboard on mouse release. + + Behavior matches standard Linux terminal: + - Click and drag to select text + - Double-click to select word, drag to extend from word boundaries + - On mouse up, automatically copy selection to clipboard + """ + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + import time + self._last_click_time = 0.0 + self._last_click_pos = (0, 0) + self._word_select_mode = False + self._word_anchor_start = None # (row, col) + self._word_anchor_end = None # (row, col) + + async def _on_mouse_down(self, event: MouseDown) -> None: + """Handle mouse down - detect double-click for word selection.""" + import time + current_time = time.time() + current_pos = (event.x, event.y) + + # Check for double-click (within 500ms and reasonably close position) + time_ok = (current_time - self._last_click_time) < 0.5 + pos_ok = (abs(current_pos[0] - self._last_click_pos[0]) <= 10 and + abs(current_pos[1] - self._last_click_pos[1]) <= 5) + is_double_click = time_ok and pos_ok + + # Update click tracking + self._last_click_time = current_time + self._last_click_pos = current_pos + + if is_double_click: + # Double-click: select word and prepare for drag + self._select_word_at_position(event) + else: + # Single click: reset word mode and do normal selection + self._word_select_mode = False + self._word_anchor_start = None + self._word_anchor_end = None + await super()._on_mouse_down(event) + + def _select_word_at_position(self, event: MouseDown) -> None: + """Select the word at the mouse position.""" + try: + location = self.get_target_document_location(event) + row, col = location + + lines = self.text.split('\n') + if row >= len(lines): + return + line = lines[row] + if col > len(line): + col = len(line) + + # Find word boundaries (whitespace-delimited) + start = col + while start > 0 and not line[start - 1].isspace(): + start -= 1 + + end = col + while end < len(line) and not line[end].isspace(): + end += 1 + + if start == end: + # No word at this position + return + + # Store anchors for potential drag extension + self._word_anchor_start = (row, start) + self._word_anchor_end = (row, end) + self._word_select_mode = True + + # Select the word + from textual.widgets.text_area import Selection + self.selection = Selection((row, start), (row, end)) + + # Set up for potential drag (like parent's _on_mouse_down) + self._selecting = True + self.capture_mouse() + self._pause_blink(visible=False) + self.history.checkpoint() + + except Exception as e: + _debug_log(f"AutoCopyTextArea._select_word_at_position error: {e}") + # On error, fall back to normal behavior + self._word_select_mode = False + + async def _on_mouse_move(self, event: MouseMove) -> None: + """Handle mouse move - extend selection if dragging.""" + if not self._selecting: + return + + try: + target = self.get_target_document_location(event) + from textual.widgets.text_area import Selection + + if self._word_select_mode and self._word_anchor_start: + # Word-select mode: anchor to original word boundaries + anchor_start = self._word_anchor_start + anchor_end = self._word_anchor_end + + if target < anchor_start: + self.selection = Selection(target, anchor_end) + elif target > anchor_end: + self.selection = Selection(anchor_start, target) + else: + self.selection = Selection(anchor_start, anchor_end) + else: + # Normal drag: extend from original click position + selection_start, _ = self.selection + self.selection = Selection(selection_start, target) + except Exception: + pass + + async def _on_mouse_up(self, event: MouseUp) -> None: + """Handle mouse up - finalize selection and copy.""" + # Clean up word select state + self._word_select_mode = False + + # Let parent finalize selection mode + self._end_mouse_selection() + + # Always try to copy - _auto_copy_if_selected checks if there's actual selection + self._auto_copy_if_selected() + + def _on_click(self, event: Click) -> None: + """Handle click events - double-click selects and copies word.""" + # Double-click: select word and copy (backup for mouse_down detection) + if event.chain >= 2: + try: + location = self.get_target_document_location(event) + row, col = location + + lines = self.text.split('\n') + if row < len(lines): + line = lines[row] + if col > len(line): + col = len(line) + + # Find word boundaries + start = col + while start > 0 and not line[start - 1].isspace(): + start -= 1 + end = col + while end < len(line) and not line[end].isspace(): + end += 1 + + if start < end: + word = line[start:end] + # Select and copy the word + from textual.widgets.text_area import Selection + self.selection = Selection((row, start), (row, end)) + # Copy immediately + success, err = safe_copy_to_clipboard(word) + if success: + preview = word[:40] + ('...' if len(word) > 40 else '') + self.app.notify(f"Copied: {preview}", severity="information") + else: + self.app.notify(f"⚠️ {err}", severity="warning") + except Exception: + pass + event.stop() + return + # Let parent handle single clicks + super()._on_click(event) + + def _auto_copy_if_selected(self) -> None: + """Copy selected text to clipboard if any.""" + try: + selected = self.selected_text + _debug_log(f"AutoCopyTextArea: selected_text={selected!r}") + if selected and selected.strip(): + success, err = safe_copy_to_clipboard(selected) + if success: + preview = selected[:40] + ('...' if len(selected) > 40 else '') + preview = preview.replace('\n', ' ') + # Use app.notify() instead of widget's notify() + self.app.notify(f"Copied: {preview}", severity="information") + _debug_log(f"AutoCopyTextArea: Copied to clipboard") + else: + self.app.notify(f"⚠️ {err}", severity="warning") + except Exception as e: + _debug_log(f"AutoCopyTextArea: Error: {e}") + + +class ShellInputTextArea(TextArea): + """TextArea for shell command input with Enter-to-execute behavior. + + Features: + - Enter executes command instead of inserting newline + - Soft wrapping for long commands + - Multi-line display + - Integrates with shell history navigation + """ + + def __init__(self, app_ref: 'SuperShellApp', *args, **kwargs): + # Set defaults for shell input behavior + kwargs.setdefault('soft_wrap', True) + kwargs.setdefault('show_line_numbers', False) + kwargs.setdefault('tab_behavior', 'focus') # Tab cycles focus, not inserts tab + super().__init__(*args, **kwargs) + self._app_ref = app_ref + + async def _on_key(self, event) -> None: + """Intercept keys for shell-specific behavior.""" + # Enter executes command instead of inserting newline + if event.key == "enter": + command = self.text.strip() + self.clear() # Clear immediately for responsiveness + if command: + # Execute asynchronously with loading indicator + self._app_ref._execute_shell_command_async(command) + event.prevent_default() + event.stop() + return + + # Up arrow navigates history + if event.key == "up": + if self._app_ref.shell_command_history: + if self._app_ref.shell_history_index < len(self._app_ref.shell_command_history) - 1: + self._app_ref.shell_history_index += 1 + history_cmd = self._app_ref.shell_command_history[-(self._app_ref.shell_history_index + 1)] + self.clear() + self.insert(history_cmd) + event.prevent_default() + event.stop() + return + + # Down arrow navigates history + if event.key == "down": + if self._app_ref.shell_history_index > 0: + self._app_ref.shell_history_index -= 1 + history_cmd = self._app_ref.shell_command_history[-(self._app_ref.shell_history_index + 1)] + self.clear() + self.insert(history_cmd) + elif self._app_ref.shell_history_index == 0: + self._app_ref.shell_history_index = -1 + self.clear() + event.prevent_default() + event.stop() + return + + # Ctrl+U clears the input (bash-like) + if event.key == "ctrl+u": + self.clear() + self._app_ref.shell_history_index = -1 + event.prevent_default() + event.stop() + return + + # Ctrl+D closes shell pane + if event.key == "ctrl+d": + self._app_ref._close_shell_pane() + event.prevent_default() + event.stop() + return + + # Escape unfocuses the input + if event.key == "escape": + self._app_ref.shell_input_active = False + tree = self._app_ref.query_one("#folder_tree") + tree.focus() + self._app_ref._update_status("Shell open | Tab to cycle | press Enter in shell to run commands") + event.prevent_default() + event.stop() + return + + # Tab cycles to search mode + if event.key == "tab": + from textual.widgets import Tree + self._app_ref.shell_input_active = False + self._app_ref.search_input_active = True + tree = self._app_ref.query_one("#folder_tree", Tree) + tree.add_class("search-input-active") + search_bar = self._app_ref.query_one("#search_bar") + search_bar.add_class("search-active") + tree.focus() # Search mode works with tree focused + self._app_ref._update_search_display(perform_search=False) + self._app_ref._update_status("Type to search | Tab to tree | Ctrl+U to clear") + event.prevent_default() + event.stop() + return + + # Shift+Tab cycles to shell output pane + if event.key == "shift+tab": + from textual.widgets import TextArea + self._app_ref.shell_input_active = False + try: + shell_output = self._app_ref.query_one("#shell_output_content", TextArea) + shell_output.focus() + except Exception: + pass + self._app_ref._update_status("Shell output | j/k to scroll | Tab to input | Shift+Tab to detail") + event.prevent_default() + event.stop() + return + + # Let parent TextArea handle all other keys (typing, backspace, cursor movement, etc.) + await super()._on_key(event) + from ..commands.base import Command @@ -252,13 +596,33 @@ class SuperShellApp(App): #record_detail:focus { background: #0a0a0a; - border: solid #333333; } #record_detail:focus-within { background: #0a0a0a; } + /* Focus indicators - green left border shows which pane is active */ + #folder_panel:focus-within { + border-left: solid #00cc00; + } + + #record_panel:focus-within { + border-left: solid #00cc00; + } + + #shell_output_content:focus { + border-left: solid #00cc00; + } + + #shell_input_container:focus-within { + border-left: solid #00cc00; + } + + #search_bar.search-active { + border-left: solid #00cc00; + } + #status_bar { dock: bottom; height: 1; @@ -309,27 +673,73 @@ class SuperShellApp(App): border-bottom: solid #333333; } - #shell_output { + #shell_output_content { height: 1fr; - overflow-y: auto; + background: #000000; + color: #ffffff; + border: none; padding: 0 1; + } + + /* Theme-specific selection colors for shell output */ + #shell_output_content.theme-green .text-area--selection { + background: #004400; + } + #shell_output_content.theme-blue .text-area--selection { + background: #002244; + } + #shell_output_content.theme-magenta .text-area--selection { + background: #330033; + } + #shell_output_content.theme-yellow .text-area--selection { + background: #333300; + } + #shell_output_content.theme-white .text-area--selection { + background: #444444; + } + /* Default fallback */ + #shell_output_content .text-area--selection { + background: #004400; + } + + /* Shell input container with prompt and TextArea */ + #shell_input_container { + height: auto; + min-height: 3; + max-height: 6; background: #000000; + border-top: solid #333333; + border-bottom: solid #333333; + padding: 0 1; } - #shell_output_content { + #shell_prompt { + width: 2; + height: 100%; background: #000000; - color: #ffffff; + color: #00ff00; + padding: 0; } - #shell_input_line { - height: 1; - background: #111111; + /* Shell input area - multi-line TextArea for command entry */ + #shell_input_area { + width: 1fr; + height: auto; + min-height: 1; + max-height: 5; + background: #000000; color: #00ff00; - padding: 0 1; + border: none; + padding: 0; + } + + #shell_input_area:focus { + background: #000000; } - #shell_pane:focus-within #shell_input_line { - background: #1a1a2e; + #shell_input_area .text-area--cursor { + color: #00ff00; + background: #00ff00; } """ @@ -400,6 +810,7 @@ def __init__(self, params): self.shell_input_active = False self.shell_command_history = [] # For up/down arrow navigation self.shell_history_index = -1 + # Shell output uses TextArea widget with built-in selection support # Load color theme from preferences prefs = load_preferences() self.color_theme = prefs.get('color_theme', 'green') @@ -459,10 +870,20 @@ def restore_expanded(node): # Update header info (email/username colors) self._update_header_info_display() - + # Update CSS dynamically for tree selection/hover self._apply_theme_css() + # Update shell output selection color theme + try: + shell_output = self.query_one("#shell_output_content") + # Remove old theme classes and add new one + for old_theme in COLOR_THEMES.keys(): + shell_output.remove_class(f"theme-{old_theme}") + shell_output.add_class(f"theme-{theme_name}") + except Exception: + pass # Shell pane might not exist yet + def notify(self, message, *, title="", severity="information", timeout=1.5): """Override notify to use faster timeout (default 1.5s instead of 5s)""" super().notify(message, title=title, severity=severity, timeout=timeout) @@ -547,9 +968,12 @@ def compose(self) -> ComposeResult: # Shell pane - hidden by default, shown when :command or Ctrl+\ pressed with Vertical(id="shell_pane"): yield Static("", id="shell_header") - with VerticalScroll(id="shell_output"): - yield Static("", id="shell_output_content") - yield Static("", id="shell_input_line") + # AutoCopyTextArea auto-copies selected text on mouse release + yield AutoCopyTextArea("", id="shell_output_content", read_only=True, classes=f"theme-{self.color_theme}") + # Shell input line with prompt and TextArea + with Horizontal(id="shell_input_container"): + yield Static("❯ ", id="shell_prompt") + yield ShellInputTextArea(self, "", id="shell_input_area") # Status bar at very bottom yield Static("", id="status_bar") @@ -3100,42 +3524,65 @@ def _update_shortcuts_bar(self, record_selected: bool = False, folder_selected: @on(Click, "#search_bar, #search_display") def on_search_bar_click(self, event: Click) -> None: """Activate search mode when search bar is clicked""" + _debug_log(f"CLICK: search_bar x={event.x} y={event.y} button={event.button} " + f"shift={event.shift} ctrl={event.ctrl} meta={event.meta}") tree = self.query_one("#folder_tree", Tree) + + # Deactivate shell input if it was active + if self.shell_input_active: + self.shell_input_active = False + try: + shell_input = self.query_one("#shell_input_area", ShellInputTextArea) + shell_input.blur() # Remove focus from shell input + except Exception: + pass + self.search_input_active = True tree.add_class("search-input-active") + search_bar = self.query_one("#search_bar") + search_bar.add_class("search-active") + tree.focus() # Focus tree so keyboard events go to search handler self._update_search_display(perform_search=False) # Don't change tree when entering search self._update_status("Type to search | Tab to navigate | Ctrl+U to clear") event.stop() + _debug_log(f"CLICK: search_bar -> stopped") @on(Click, "#user_info") def on_user_info_click(self, event: Click) -> None: """Show whoami info when user info is clicked""" + _debug_log(f"CLICK: user_info x={event.x} y={event.y}") self._display_whoami_info() event.stop() + _debug_log(f"CLICK: user_info -> stopped") @on(Click, "#device_status_info") def on_device_status_click(self, event: Click) -> None: """Show device info when Stay Logged In / Logout section is clicked""" + _debug_log(f"CLICK: device_status_info x={event.x} y={event.y}") self._display_device_info() event.stop() + _debug_log(f"CLICK: device_status_info -> stopped") - @on(Click, "#shell_pane, #shell_output, #shell_output_content, #shell_input_line, #shell_header") + @on(Click, "#shell_pane, #shell_input_area, #shell_header") def on_shell_pane_click(self, event: Click) -> None: - """Handle clicks in shell pane - activate shell input and stop propagation. + """Handle clicks in shell pane (not output area) - activate shell input.""" + _debug_log(f"CLICK: shell_pane x={event.x} y={event.y} button={event.button}") - This prevents clicks in the shell pane from bubbling up and triggering - expensive operations in other parts of the UI. - Note: Native terminal text selection requires Shift+click (terminal-dependent). - """ + # Normal click - activate and focus shell input if self.shell_pane_visible: - # Activate shell input when clicking anywhere in shell pane - if not self.shell_input_active: - self.shell_input_active = True - self._update_shell_input_display() + self.shell_input_active = True + try: + shell_input = self.query_one("#shell_input_area", ShellInputTextArea) + shell_input.focus() + except Exception: + pass event.stop() + _debug_log(f"CLICK: shell_pane -> stopped") def on_paste(self, event: Paste) -> None: """Handle paste events (Cmd+V on Mac, Ctrl+V on Windows/Linux)""" + _debug_log(f"PASTE: text={event.text!r} shell_input_active={self.shell_input_active}") + # Shell input TextArea handles its own paste - don't interfere if self.search_input_active and event.text: # Append pasted text to search input (strip newlines) pasted_text = event.text.replace('\n', ' ').replace('\r', '') @@ -3151,6 +3598,8 @@ def on_tree_node_selected(self, event: Tree.NodeSelected): self.search_input_active = False tree = self.query_one("#folder_tree", Tree) tree.remove_class("search-input-active") + search_bar = self.query_one("#search_bar") + search_bar.remove_class("search-active") # Update search display to remove cursor search_display = self.query_one("#search_display", Static) if self.search_input_text: @@ -3247,7 +3696,11 @@ def _update_search_display(self, perform_search=True): # Update display with blinking cursor at end if self.search_input_text: # Show text with blinking cursor (escape special chars for Rich markup) - display_text = f"> {rich_escape(self.search_input_text)}[blink]▎[/blink]" + escaped_text = rich_escape(self.search_input_text) + # Double trailing backslash so it doesn't escape the [blink] tag + if escaped_text.endswith('\\'): + escaped_text += '\\' + display_text = f"> {escaped_text}[blink]▎[/blink]" else: # Show prompt with blinking cursor (ready to type) display_text = "> [blink]▎[/blink]" @@ -3286,8 +3739,14 @@ def on_key(self, event): Keyboard handling is delegated to specialized handlers in supershell/handlers/keyboard.py for better organization and testing. """ + _debug_log(f"KEY: key={event.key!r} char={event.character!r} " + f"shell_visible={self.shell_pane_visible} shell_input_active={self.shell_input_active} " + f"search_active={self.search_input_active}") + # Dispatch to the keyboard handler chain - if keyboard_dispatcher.dispatch(event, self): + handled = keyboard_dispatcher.dispatch(event, self) + _debug_log(f"KEY: handled={handled}") + if handled: return # Event was not handled by any handler - let it propagate @@ -3379,19 +3838,32 @@ def _open_shell_pane(self, command: str = None): self.shell_pane_visible = True self.shell_input_active = True - self.shell_input_text = "" + self.shell_input_text = "" # Keep for compatibility + self._shell_executing = False # Track if command is executing - # Update shell header with theme colors + # Update shell header with theme colors and prompt self._update_shell_header() - # Initialize the input display - self._update_shell_input_display() + # Initialize the prompt with green ❯ + try: + prompt = self.query_one("#shell_prompt", Static) + prompt.update("[green]❯[/green] ") + except Exception: + pass + + # Focus the input TextArea + try: + shell_input = self.query_one("#shell_input_area", ShellInputTextArea) + shell_input.focus() + shell_input.clear() + except Exception: + pass # If a command was provided, execute it immediately if command: - self._execute_shell_command(command) + self._execute_shell_command_async(command) - self._update_status("Shell open | Enter to run | quit/q/Ctrl+D to close") + self._update_status("Shell open | Enter to run | Up/Down for history | Ctrl+D to close") def _close_shell_pane(self): """Close the shell pane and return to normal view""" @@ -3403,12 +3875,176 @@ def _close_shell_pane(self): self.shell_input_text = "" self.shell_history_index = -1 + # Clear the input TextArea + try: + shell_input = self.query_one("#shell_input_area", ShellInputTextArea) + shell_input.clear() + except Exception: + pass + # Focus tree tree = self.query_one("#folder_tree", Tree) tree.focus() self._update_status("Navigate with j/k | / to search | ? for help") + def _execute_shell_command_async(self, command: str): + """Execute a command asynchronously with loading indicator.""" + # Show spinner in prompt + self._start_shell_spinner() + + # Run the command in a worker thread + self.run_worker( + lambda: self._execute_shell_command_worker(command), + name="shell_command", + exclusive=True, + thread=True + ) + + def _start_shell_spinner(self): + """Start the spinner animation in the shell prompt.""" + self._shell_spinner_frames = ['⠋', '⠙', '⠹', '⠸', '⠼', '⠴', '⠦', '⠧', '⠇', '⠏'] + self._shell_spinner_index = 0 + self._shell_executing = True + + # Update prompt to show first spinner frame + try: + prompt = self.query_one("#shell_prompt", Static) + prompt.update(f"[yellow]{self._shell_spinner_frames[0]}[/yellow] ") + except Exception: + pass + + # Start timer for animation + self._shell_spinner_timer = self.set_interval(0.1, self._animate_shell_spinner) + + def _animate_shell_spinner(self): + """Animate the shell spinner.""" + if not self._shell_executing: + return + + self._shell_spinner_index = (self._shell_spinner_index + 1) % len(self._shell_spinner_frames) + try: + prompt = self.query_one("#shell_prompt", Static) + prompt.update(f"[yellow]{self._shell_spinner_frames[self._shell_spinner_index]}[/yellow] ") + except Exception: + pass + + def _stop_shell_spinner(self): + """Stop the spinner and restore the prompt.""" + self._shell_executing = False + if hasattr(self, '_shell_spinner_timer') and self._shell_spinner_timer: + self._shell_spinner_timer.stop() + self._shell_spinner_timer = None + + # Restore normal prompt + try: + prompt = self.query_one("#shell_prompt", Static) + prompt.update("[green]❯[/green] ") + except Exception: + pass + + def _execute_shell_command_worker(self, command: str): + """Worker function that executes the command and returns the result.""" + command = command.strip() + + # Handle quit commands + if command.lower() in ('quit', 'q', 'exit'): + self.call_from_thread(self._stop_shell_spinner) + self.call_from_thread(self._close_shell_pane) + return + + # Block supershell inside supershell + cmd_name = command.split()[0].lower() if command.split() else '' + if cmd_name in ('supershell', 'ss'): + self.shell_history.append((command, "[yellow]Cannot run supershell inside supershell[/yellow]")) + self.call_from_thread(self._stop_shell_spinner) + self.call_from_thread(self._update_shell_output_display) + self.call_from_thread(self._scroll_shell_to_bottom) + return + + # Handle clear command + if command.lower() == 'clear': + self.shell_history = [] + self.call_from_thread(self._stop_shell_spinner) + self.call_from_thread(self._update_shell_output_display) + return + + # Add to command history for up/down navigation + if command and (not self.shell_command_history or + self.shell_command_history[-1] != command): + self.shell_command_history.append(command) + self.shell_history_index = -1 + + # Capture stdout/stderr for command execution + stdout_buffer = io.StringIO() + stderr_buffer = io.StringIO() + log_buffer = io.StringIO() + old_stdout = sys.stdout + old_stderr = sys.stderr + + # Create a temporary logging handler to capture log output + log_handler = logging.StreamHandler(log_buffer) + log_handler.setLevel(logging.INFO) + log_handler.setFormatter(logging.Formatter('%(message)s')) + root_logger = logging.getLogger() + root_logger.addHandler(log_handler) + + try: + sys.stdout = stdout_buffer + sys.stderr = stderr_buffer + + # Execute via cli.do_command + from ..cli import do_command + result = do_command(self.params, command) + # Some commands return output (e.g., JSON format) instead of printing + if result is not None: + print(result) + + except Exception as e: + stderr_buffer.write(f"Error: {str(e)}\n") + finally: + sys.stdout = old_stdout + sys.stderr = old_stderr + root_logger.removeHandler(log_handler) + + # Get output + output = stdout_buffer.getvalue() + errors = stderr_buffer.getvalue() + log_output = log_buffer.getvalue() + + # Strip ANSI codes + output = strip_ansi_codes(output) + errors = strip_ansi_codes(errors) + log_output = strip_ansi_codes(log_output) + + # Combine output (stdout first, then log output, then errors) + full_output = output.rstrip() + if log_output.strip(): + if full_output: + full_output += "\n" + full_output += log_output.rstrip() + if errors: + if full_output: + full_output += "\n" + full_output += f"[red]{rich_escape(errors.rstrip())}[/red]" + + # Add to history + self.shell_history.append((command, full_output)) + + # Stop spinner and update display on the main thread + self.call_from_thread(self._stop_shell_spinner) + self.call_from_thread(self._update_shell_output_display) + self.call_from_thread(self._scroll_shell_to_bottom) + + def _scroll_shell_to_bottom(self): + """Scroll shell output to bottom.""" + try: + shell_output = self.query_one("#shell_output_content", TextArea) + shell_output.action_cursor_line_end() + shell_output.scroll_end(animate=False) + except Exception: + pass + def _execute_shell_command(self, command: str): """Execute a Keeper command in the shell pane and display output""" command = command.strip() @@ -3436,35 +4072,55 @@ def _execute_shell_command(self, command: str): # Capture stdout/stderr for command execution stdout_buffer = io.StringIO() stderr_buffer = io.StringIO() + log_buffer = io.StringIO() old_stdout = sys.stdout old_stderr = sys.stderr + # Create a temporary logging handler to capture log output + log_handler = logging.StreamHandler(log_buffer) + log_handler.setLevel(logging.INFO) + log_handler.setFormatter(logging.Formatter('%(message)s')) + root_logger = logging.getLogger() + root_logger.addHandler(log_handler) + try: sys.stdout = stdout_buffer sys.stderr = stderr_buffer # Execute via cli.do_command from ..cli import do_command - do_command(self.params, command) + result = do_command(self.params, command) + # Some commands return output (e.g., JSON format) instead of printing + if result is not None: + print(result) except Exception as e: stderr_buffer.write(f"Error: {str(e)}\n") finally: sys.stdout = old_stdout sys.stderr = old_stderr + root_logger.removeHandler(log_handler) # Get output output = stdout_buffer.getvalue() errors = stderr_buffer.getvalue() + log_output = log_buffer.getvalue() # Strip ANSI codes output = strip_ansi_codes(output) errors = strip_ansi_codes(errors) + log_output = strip_ansi_codes(log_output) - # Combine output + # Combine output (stdout first, then log output, then errors) full_output = output.rstrip() + if log_output.strip(): + if full_output: + full_output += "\n" + full_output += log_output.rstrip() if errors: - full_output += f"\n[red]{rich_escape(errors.rstrip())}[/red]" + if full_output: + full_output += "\n" + full_output += f"[red]{rich_escape(errors.rstrip())}[/red]" # Add to history self.shell_history.append((command, full_output)) @@ -3475,52 +4131,47 @@ def _execute_shell_command(self, command: str): # Scroll to bottom (defer to ensure content is rendered) def scroll_to_bottom(): try: - shell_output = self.query_one("#shell_output", VerticalScroll) + shell_output = self.query_one("#shell_output_content", TextArea) + # Move cursor to end to scroll to bottom + shell_output.action_cursor_line_end() shell_output.scroll_end(animate=False) except Exception: pass self.call_after_refresh(scroll_to_bottom) def _update_shell_output_display(self): - """Update the shell output area with command history""" + """Update the shell output area with command history (using TextArea for selection support)""" try: - shell_output_content = self.query_one("#shell_output_content", Static) + shell_output_content = self.query_one("#shell_output_content", TextArea) except Exception: return - t = self.theme_colors lines = [] for cmd, output in self.shell_history: - # Show prompt and command + # Show prompt and command (plain text - TextArea doesn't support Rich markup) prompt = self._get_shell_prompt() - lines.append(f"[{t['primary']}]{prompt}[/{t['primary']}]{rich_escape(cmd)}") + lines.append(f"{prompt}{cmd}") # Show output (with blank line separator only if there's output) if output.strip(): - lines.append(output) + # Strip Rich markup tags from output, but preserve JSON arrays + # Rich tags look like [red], [/bold], [#ffffff], [bold red] - start with letter, /, or # + # JSON arrays contain quotes, braces, colons, etc. + plain_output = re.sub(r'\[/?[a-zA-Z#][a-zA-Z0-9_ #]*\]', '', output) + lines.append(plain_output) lines.append("") # Blank line after output - shell_output_content.update("\n".join(lines)) + # TextArea uses .text property, not .update() + shell_output_content.text = "\n".join(lines) + _debug_log(f"_update_shell_output_display: updated TextArea with {len(lines)} lines") def _update_shell_input_display(self): - """Update the shell input line with prompt and current input text""" - try: - shell_input = self.query_one("#shell_input_line", Static) - except Exception: - return + """Legacy method - now a no-op since ShellInputTextArea handles its own display. - t = self.theme_colors - prompt = self._get_shell_prompt() - - if self.shell_input_active: - cursor = "[reverse] [/reverse]" - else: - cursor = "" - - shell_input.update( - f"[{t['primary']}]{prompt}[/{t['primary']}]" - f"{rich_escape(self.shell_input_text)}{cursor}" - ) + Kept for compatibility with any code that might still call it. + The prompt is now shown in the shell header via _update_shell_header(). + """ + pass def _get_shell_prompt(self) -> str: """Get the shell prompt based on current folder context""" @@ -3537,16 +4188,17 @@ def _get_shell_prompt(self) -> str: return "My Vault> " def _update_shell_header(self): - """Update shell header bar with theme colors""" + """Update shell header bar with theme colors and current prompt context""" try: shell_header = self.query_one("#shell_header", Static) except Exception: return t = self.theme_colors + prompt = self._get_shell_prompt() shell_header.update( - f"[bold {t['primary']}]Keeper Shell[/bold {t['primary']}] " - f"[{t['text_dim']}](quit/q/Ctrl+D to close)[/{t['text_dim']}]" + f"[bold {t['primary']}]{prompt}[/bold {t['primary']}]" + f"[{t['text_dim']}] (Enter to run | Up/Down for history | Ctrl+D to close)[/{t['text_dim']}]" ) def check_action(self, action: str, parameters: tuple) -> bool | None: @@ -3629,6 +4281,16 @@ def action_toggle_unmask(self): def action_copy_password(self): """Copy password of selected record to clipboard using clipboard-copy command (generates audit event)""" if self.selected_record and self.selected_record in self.records: + # First check if clipboard is available (to distinguish from "no password" errors) + try: + pyperclip.copy("") # Test clipboard availability + except PyperclipException: + self.notify("⚠️ Clipboard not available (no X11/Wayland)", severity="warning") + return + except Exception as e: + self.notify(f"⚠️ {e}", severity="warning") + return + try: # Use ClipboardCommand to copy password - this generates the audit event cc = ClipboardCommand() @@ -3646,8 +4308,11 @@ def action_copy_username(self): if self.selected_record and self.selected_record in self.records: record = self.records[self.selected_record] if 'login' in record: - pyperclip.copy(record['login']) - self.notify("👤 Username copied to clipboard!", severity="information") + success, err = safe_copy_to_clipboard(record['login']) + if success: + self.notify("👤 Username copied to clipboard!", severity="information") + else: + self.notify(f"⚠️ {err}", severity="warning") else: self.notify("⚠️ No username found for this record", severity="warning") else: @@ -3658,8 +4323,11 @@ def action_copy_url(self): if self.selected_record and self.selected_record in self.records: record = self.records[self.selected_record] if 'login_url' in record: - pyperclip.copy(record['login_url']) - self.notify("🔗 URL copied to clipboard!", severity="information") + success, err = safe_copy_to_clipboard(record['login_url']) + if success: + self.notify("🔗 URL copied to clipboard!", severity="information") + else: + self.notify(f"⚠️ {err}", severity="warning") else: self.notify("⚠️ No URL found for this record", severity="warning") else: @@ -3668,11 +4336,17 @@ def action_copy_url(self): def action_copy_uid(self): """Copy UID of selected record or folder to clipboard""" if self.selected_record: - pyperclip.copy(self.selected_record) - self.notify("📋 Record UID copied to clipboard!", severity="information") + success, err = safe_copy_to_clipboard(self.selected_record) + if success: + self.notify("📋 Record UID copied to clipboard!", severity="information") + else: + self.notify(f"⚠️ {err}", severity="warning") elif self.selected_folder: - pyperclip.copy(self.selected_folder) - self.notify("📋 Folder UID copied to clipboard!", severity="information") + success, err = safe_copy_to_clipboard(self.selected_folder) + if success: + self.notify("📋 Folder UID copied to clipboard!", severity="information") + else: + self.notify(f"⚠️ {err}", severity="warning") else: self.notify("⚠️ No record or folder selected", severity="warning") @@ -3747,8 +4421,11 @@ def action_copy_record(self): if self.view_mode == 'json': # Copy as JSON formatted = json.dumps(app_data, indent=2) - pyperclip.copy(formatted) - self.notify("📋 Secrets Manager app JSON copied to clipboard!", severity="information") + success, err = safe_copy_to_clipboard(formatted) + if success: + self.notify("📋 Secrets Manager app JSON copied to clipboard!", severity="information") + else: + self.notify(f"⚠️ {err}", severity="warning") else: # Copy as formatted text (detail view) lines = [] @@ -3783,8 +4460,11 @@ def action_copy_record(self): lines.append("") formatted = "\n".join(lines) - pyperclip.copy(formatted) - self.notify("📋 Secrets Manager app details copied to clipboard!", severity="information") + success, err = safe_copy_to_clipboard(formatted) + if success: + self.notify("📋 Secrets Manager app details copied to clipboard!", severity="information") + else: + self.notify(f"⚠️ {err}", severity="warning") else: # Regular record handling record_data = self.records.get(self.selected_record, {}) @@ -3796,22 +4476,28 @@ def action_copy_record(self): output = strip_ansi_codes(output) json_obj = json.loads(output) formatted = json.dumps(json_obj, indent=2) - pyperclip.copy(formatted) - # Generate audit event since JSON contains the password - if has_password: - self.params.queue_audit_event('copy_password', record_uid=self.selected_record) - self.notify("📋 JSON copied to clipboard!", severity="information") + success, err = safe_copy_to_clipboard(formatted) + if success: + # Generate audit event since JSON contains the password + if has_password: + self.params.queue_audit_event('copy_password', record_uid=self.selected_record) + self.notify("📋 JSON copied to clipboard!", severity="information") + else: + self.notify(f"⚠️ {err}", severity="warning") else: # Copy formatted text (without Rich markup) content = self._format_record_for_tui(self.selected_record) # Strip Rich markup for plain text clipboard import re plain = re.sub(r'\[/?[^\]]+\]', '', content) - pyperclip.copy(plain) - # Generate audit event if record has password (detail view includes password) - if has_password: - self.params.queue_audit_event('copy_password', record_uid=self.selected_record) - self.notify("📋 Record contents copied to clipboard!", severity="information") + success, err = safe_copy_to_clipboard(plain) + if success: + # Generate audit event if record has password (detail view includes password) + if has_password: + self.params.queue_audit_event('copy_password', record_uid=self.selected_record) + self.notify("📋 Record contents copied to clipboard!", severity="information") + else: + self.notify(f"⚠️ {err}", severity="warning") except Exception as e: logging.error(f"Error copying record: {e}", exc_info=True) self.notify("⚠️ Failed to copy record contents", severity="error") diff --git a/keepercommander/commands/aram.py b/keepercommander/commands/aram.py index ad7eeef9f..a48dc3214 100644 --- a/keepercommander/commands/aram.py +++ b/keepercommander/commands/aram.py @@ -173,6 +173,152 @@ API_EVENT_RAW_ROW_LIMIT = 1000 +def fetch_audit_events(params, audit_filter, columns=None, aggregate=None, report_type='span', limit=None, order=None): + # type: (KeeperParams, Dict[str, Any], Optional[List[str]], Optional[List[str]], str, Optional[int], Optional[str]) -> List[Dict[str, Any]] + """ + Fetch audit events with pagination support. + + This function handles pagination for both raw and span/consolidated reports, + making it suitable for enterprises with large numbers of users/events. + + Args: + params: KeeperParams object + audit_filter: dict with filter criteria (audit_event_type, created, username, etc.) + columns: list of columns for non-raw reports (default: ['username']) + aggregate: list of aggregates for non-raw reports (default: ['last_created'] for span) + report_type: 'raw', 'span', 'hour', 'day', 'week', 'month' (default: 'span') + limit: maximum number of events to return (None for all) + order: 'ascending' or 'descending' (default: None, uses API default) + + Returns: + list of event dictionaries + """ + rq = { + 'command': 'get_audit_event_reports', + 'report_type': report_type, + 'scope': 'enterprise' if params.enterprise else 'user' + } + + if columns: + rq['columns'] = columns + elif report_type != 'raw': + rq['columns'] = ['username'] + + if aggregate: + rq['aggregate'] = aggregate + elif report_type == 'span': + rq['aggregate'] = ['last_created'] + + api_row_limit = API_EVENT_SUMMARY_ROW_LIMIT if report_type != 'raw' else API_EVENT_RAW_ROW_LIMIT + rq_limit = api_row_limit if limit is None else min(limit, api_row_limit) if limit > 0 else api_row_limit + rq['limit'] = rq_limit + + if order: + rq['order'] = order + + if audit_filter: + rq['filter'] = copy.deepcopy(audit_filter) + + events = [] + reqs = [rq] + max_iterations = 1000 + iteration_count = 0 + + while reqs and iteration_count < max_iterations: + iteration_count += 1 + rss = api.execute_batch(params, reqs) + next_reqs = [] + should_stop = False + + for idx, rs in enumerate(rss): + batch_events = rs.get('audit_event_overview_report_rows', []) + if not batch_events or not isinstance(batch_events, list): + continue + + events.extend(batch_events) + + # Check if user limit reached + if limit is not None and limit > 0 and len(events) >= limit: + events = events[:limit] + should_stop = True + break + + # Check if we need to paginate + batch_limit = api_row_limit if report_type != 'raw' else API_EVENT_RAW_ROW_LIMIT + if len(batch_events) >= batch_limit: + if idx >= len(reqs): + continue + + current_req = reqs[idx] + + # Determine timestamp field based on report type + timestamp_field = None + if report_type == 'span': + if batch_events and 'last_created' in batch_events[-1]: + timestamp_field = 'last_created' + elif batch_events and 'first_created' in batch_events[-1]: + timestamp_field = 'first_created' + elif batch_events and 'created' in batch_events[-1]: + timestamp_field = 'created' + else: + timestamp_field = 'created' + + if timestamp_field and batch_events and timestamp_field in batch_events[-1]: + try: + asc = current_req.get('order') == 'ascending' + first_key, last_key = ('min', 'max') if asc else ('max', 'min') + req_filter = copy.deepcopy(current_req.get('filter', {})) + req_period = req_filter.get('created', {}) + + timestamp_value = batch_events[-1][timestamp_field] + period = {first_key: int(timestamp_value) if isinstance(timestamp_value, (int, float, str)) else timestamp_value} + + if not isinstance(req_period, dict) or req_period.get(last_key) is None: + # Get the boundary timestamp + last_rq = {**current_req} + reverse = 'descending' if asc else 'ascending' + last_rq['order'] = reverse + last_rq['limit'] = 1 + rs_last = api.communicate(params, last_rq) + last_row_events = rs_last.get('audit_event_overview_report_rows') + if last_row_events: + last_row = last_row_events[0] + last_timestamp = last_row.get(timestamp_field, last_row.get('created', 0)) + period[last_key] = int(last_timestamp) if isinstance(last_timestamp, (int, float, str)) else last_timestamp + else: + period[last_key] = req_period.get(last_key) + + req_filter['created'] = period + next_req = {**current_req} + next_req['filter'] = req_filter + + if limit is not None and limit > 0: + missing = limit - len(events) + if missing <= 0: + should_stop = True + break + elif missing < batch_limit: + next_req['limit'] = missing + + next_reqs.append(next_req) + except (ValueError, TypeError, KeyError) as e: + logging.warning(f"Error processing pagination timestamp: {e}") + continue + + if should_stop: + break + + if should_stop: + break + + reqs = next_reqs + + if iteration_count >= max_iterations: + logging.warning(f"Pagination stopped after reaching maximum iterations ({max_iterations})") + + return events + + def load_syslog_templates(params): global syslog_templates if syslog_templates is None: @@ -1643,102 +1789,32 @@ def normalize_param(param_value): if columns: fields.extend(columns) - max_iterations = 1000 # Safety limit to prevent infinite loops - iteration_count = 0 - - while reqs and iteration_count < max_iterations: - iteration_count += 1 - next_reqs = [] - should_stop = False - - for idx, rs in enumerate(rss): - events = rs.get('audit_event_overview_report_rows', []) - if not events or not isinstance(events, list): - continue - for event in events: - row = [] - for f in fields: - if f in event: - row.append( - self.convert_value(f, event[f], report_type=report_type, details=details, params=params) - ) - elif f in audit_report.fields_to_uid_name: - row.append(self.resolve_lookup(params, f, event)) - else: - row.append('') - table.append(row) - - if len(events) >= API_EVENT_SUMMARY_ROW_LIMIT: - if idx >= len(reqs): - logging.warning(f"Index mismatch in pagination: idx={idx}, len(reqs)={len(reqs)}") - continue - - current_req = reqs[idx] - - timestamp_field = None - if report_type == 'span': - if events and 'last_created' in events[-1]: - timestamp_field = 'last_created' - elif events and 'first_created' in events[-1]: - timestamp_field = 'first_created' - elif events and 'created' in events[-1]: - timestamp_field = 'created' - else: - timestamp_field = 'created' - - if timestamp_field and events and timestamp_field in events[-1]: - try: - asc = current_req.get('order') == 'ascending' - first_key, last_key = ('min', 'max') if asc else ('max', 'min') - req_filter = current_req.get('filter', {}) - req_period = req_filter.get('created', {}) - - timestamp_value = events[-1][timestamp_field] - period = {first_key: int(timestamp_value) if isinstance(timestamp_value, (int, float, str)) else timestamp_value} - - if not isinstance(req_period, dict) or req_period.get(last_key) is None: - last_rq = {**current_req} - reverse = 'descending' if asc else 'ascending' - last_rq['order'] = reverse - last_rq['limit'] = 1 - rs_last = api.communicate(params, last_rq) - last_row_events = rs_last.get('audit_event_overview_report_rows') - if last_row_events: - last_row = last_row_events[0] - last_timestamp = last_row.get(timestamp_field, last_row.get('created', 0)) - period[last_key] = int(last_timestamp) if isinstance(last_timestamp, (int, float, str)) else last_timestamp - else: - period[last_key] = req_period.get(last_key) - - req_filter['created'] = period - next_req = {**current_req} - next_req['filter'] = req_filter - - if user_limit and user_limit > 0: - missing = user_limit - len(table) - if missing <= 0: - should_stop = True - break - elif missing < API_EVENT_SUMMARY_ROW_LIMIT: - next_req['limit'] = missing - - next_reqs.append(next_req) - except (ValueError, TypeError, KeyError) as e: - logging.warning(f"Error processing pagination timestamp: {e}") - continue - - if should_stop: - break - - if should_stop: - break - - reqs = next_reqs - if reqs: - rss = api.execute_batch(params, reqs) - - if iteration_count >= max_iterations: - logging.warning(f"Pagination stopped after reaching maximum iterations ({max_iterations})") + # Use the fetch_audit_events function for pagination + all_events = [] + for req in reqs: + events = fetch_audit_events( + params, + audit_filter=req.get('filter', {}), + columns=req.get('columns'), + aggregate=req.get('aggregate'), + report_type=report_type, + limit=user_limit, + order=req.get('order') + ) + all_events.extend(events) + + for event in all_events: + row = [] + for f in fields: + if f in event: + row.append( + self.convert_value(f, event[f], report_type=report_type, details=details, params=params) + ) + elif f in audit_report.fields_to_uid_name: + row.append(self.resolve_lookup(params, f, event)) + else: + row.append('') + table.append(row) table = reporting.filter_rows(table, patterns, use_regex, match_all) return dump_report_data(table, fields, fmt=kwargs.get('format'), filename=kwargs.get('output')) @@ -2036,46 +2112,30 @@ def get_parser(self): # type: () -> Optional[argparse.ArgumentParser] return action_report_parser def execute(self, params, **kwargs): - def cmd_rq(cmd): - return {'command': cmd, 'scope': 'enterprise'} - - def report_rq(query_filter, limit, cols=None, report_type='span'): - rq = { - **cmd_rq('get_audit_event_reports'), - 'report_type': report_type, - 'filter': query_filter, - 'limit': limit - } - - if report_type == 'span': - rq['columns'] = ['username'] if cols is None else cols - rq['aggregate'] = ['last_created'] - - return rq - def get_excluded(candidate_usernames, query_filter, username_field='username'): # type: (Set[str], Dict[str, Any], Optional[str]) -> Set[str] - excluded = set() - req_limit = API_EVENT_SUMMARY_ROW_LIMIT + """ + Get usernames that should be excluded (users who HAVE performed the action). + + """ + if not candidate_usernames: + return set() + cols = [username_field] - def adjust_filter(q_filter, max_ts=0): - if max_ts: - q_filter['created']['max'] = max_ts - return q_filter + # Use the fetch_audit_events function with pagination support + events = fetch_audit_events( + params, + audit_filter=query_filter, + columns=cols, + aggregate=['last_created'], + report_type='span', + limit=None # Get all matching events + ) - done = not candidate_usernames - while not done: - rq = report_rq(query_filter, req_limit, cols, report_type='span') - rs = api.communicate(params, rq) - events = rs['audit_event_overview_report_rows'] - to_exclude = {event.get(username_field, '').lower() for event in events} - excluded.update(to_exclude.intersection(candidate_usernames)) - end = int(events[-1]['last_created']) if events else 0 - done = (len(events) < req_limit - or len(candidate_usernames) == len(excluded) - or query_filter.get('created', {}).get('min', end) >= end) - query_filter = adjust_filter(query_filter, end + 1) if not done else None + # Extract usernames from events that match the candidate usernames + to_exclude = {event.get(username_field, '').lower() for event in events} + excluded = to_exclude.intersection(candidate_usernames) return excluded @@ -2117,9 +2177,14 @@ def get_latest_lock_times(usernames): 'to_username': chunk, 'created': {'min': 0, 'max': now_ts} } - rq = report_rq(query_filter, API_EVENT_SUMMARY_ROW_LIMIT, cols=['to_username'], report_type='span') - rs = api.communicate(params, rq) - events = rs.get('audit_event_overview_report_rows', []) + events = fetch_audit_events( + params, + audit_filter=query_filter, + columns=['to_username'], + aggregate=['last_created'], + report_type='span', + limit=API_EVENT_SUMMARY_ROW_LIMIT + ) for event in events: username = (event.get('to_username') or '').lower() ts = int(event.get('last_created') or 0) diff --git a/keepercommander/commands/base.py b/keepercommander/commands/base.py index c56194698..953bf9d86 100644 --- a/keepercommander/commands/base.py +++ b/keepercommander/commands/base.py @@ -218,6 +218,7 @@ def register_enterprise_commands(commands, aliases, command_info): commands['epm'] = pedm_command command_info['epm'] = pedm_command.description aliases['pedm'] = 'epm' + aliases['kepm'] = 'epm' def register_msp_commands(commands, aliases, command_info): diff --git a/keepercommander/commands/discover/__init__.py b/keepercommander/commands/discover/__init__.py index 247a2e07d..24d163549 100644 --- a/keepercommander/commands/discover/__init__.py +++ b/keepercommander/commands/discover/__init__.py @@ -1,5 +1,7 @@ from __future__ import annotations import logging +import os + from ..base import Command from ..pam.config_facades import PamConfigurationRecordFacade from ..pam.router_helper import get_response_payload @@ -11,8 +13,11 @@ from ...crypto import encrypt_aes_v2, decrypt_aes_v2 from ...display import bcolors from ...discovery_common.constants import PAM_USER, PAM_MACHINE, PAM_DATABASE, PAM_DIRECTORY +from ...utils import value_to_boolean import json import base64 +import re + from typing import List, Optional, Union, Callable, Tuple, Any, Dict, TYPE_CHECKING if TYPE_CHECKING: @@ -64,7 +69,27 @@ def all_gateways(params: KeeperParams): return get_all_gateways(params) @staticmethod - def find_gateway(params: KeeperParams, find_func: Callable, gateways: Optional[List] = None) \ + def get_configuration_records(params) -> List[KeeperRecord]: + + """ + Get PAM configuration records. + + The default it to find all the record version 6 records. + If the environment variable `PAM_RECORD_TYPE_MATCH` is set to a true value, the search will use both record + versions 3 and 6, and then check the record type. + """ + + configuration_list = [] + if value_to_boolean(os.environ.get("PAM_RECORD_TYPE_MATCH")): + for record in list(vault_extensions.find_records(params, record_version=iter([3, 6]))): + if re.search(r"pam.+Configuration", record.record_type): + configuration_list.append(record) + else: + configuration_list = list(vault_extensions.find_records(params, record_version=6)) + return configuration_list + + @classmethod + def find_gateway(cls, params: KeeperParams, find_func: Callable, gateways: Optional[List] = None) \ -> Tuple[Optional[GatewayContext], Any]: """ @@ -76,8 +101,9 @@ def find_gateway(params: KeeperParams, find_func: Callable, gateways: Optional[L if gateways is None: gateways = GatewayContext.all_gateways(params) - configuration_records = list(vault_extensions.find_records(params, "pam.*Configuration")) + configuration_records = cls.get_configuration_records(params) for configuration_record in configuration_records: + payload = find_func( configuration_record=configuration_record ) @@ -142,8 +168,8 @@ def from_gateway(params: KeeperParams, gateway: str, configuration_uid: Optional If there is only one gateway, then that gateway is used. """ - # Get all the PAM configuration records in the Vault; not Application - configuration_records = list(vault_extensions.find_records(params, "pam.*Configuration")) + # Get all the PAM configuration records in the Vault; configurations are version 6 + configuration_records = GatewayContext.get_configuration_records(params=params) if configuration_uid: logging.debug(f"find the gateway with configuration record {configuration_uid}") diff --git a/keepercommander/commands/discover/job_status.py b/keepercommander/commands/discover/job_status.py index f9e3fd0f3..8ad2bddd3 100644 --- a/keepercommander/commands/discover/job_status.py +++ b/keepercommander/commands/discover/job_status.py @@ -306,7 +306,7 @@ def execute(self, params, **kwargs): # For each configuration/ gateway, we are going to get all jobs. # We are going to query the gateway for any updated status. - configuration_records = list(vault_extensions.find_records(params, "pam.*Configuration")) + configuration_records = GatewayContext.get_configuration_records(params=params) for configuration_record in configuration_records: gateway_context = GatewayContext.from_configuration_uid( diff --git a/keepercommander/commands/discover/result_process.py b/keepercommander/commands/discover/result_process.py index c9ae11118..4f44b0d21 100644 --- a/keepercommander/commands/discover/result_process.py +++ b/keepercommander/commands/discover/result_process.py @@ -4,7 +4,7 @@ import json import sys import os.path - +import re from keeper_secrets_manager_core.utils import url_safe_str_to_bytes from . import PAMGatewayActionDiscoverCommandBase, GatewayContext from ..pam.router_helper import (router_get_connected_gateways, router_set_record_rotation_information, @@ -23,7 +23,6 @@ from ...discovery_common.constants import VERTICES_SORT_MAP from pydantic import BaseModel from typing import Optional, List, Any, Tuple, Dict, TYPE_CHECKING - from ...api import get_records_add_request if TYPE_CHECKING: @@ -235,7 +234,7 @@ def _build_record_cache(self, params: KeeperParams, gateway_context: GatewayCont } # Set all the PAM Records - records = list(vault_extensions.find_records(params, "pam*")) + records = list(vault_extensions.find_records(params, "pam*", use_regex=True)) for record in records: # If the record type is not part of the cache, skip the record if record.record_type not in cache: @@ -1471,7 +1470,7 @@ def execute(self, params: KeeperParams, **kwargs): all_gateways = GatewayContext.all_gateways(params) - configuration_records = list(vault_extensions.find_records(params, "pam.*Configuration")) + configuration_records = GatewayContext.get_configuration_records(params=params) for configuration_record in configuration_records: gateway_context = GatewayContext.from_configuration_uid(params=params, diff --git a/keepercommander/commands/enterprise.py b/keepercommander/commands/enterprise.py index 25c9bc212..09c4218ee 100644 --- a/keepercommander/commands/enterprise.py +++ b/keepercommander/commands/enterprise.py @@ -426,7 +426,7 @@ def execute(self, params, **kwargs): nodes[node_id] = { 'node_id': node_id, 'parent_id': node.get('parent_id') or 0, - 'name': node['data'].get('displayname') or '', + 'name': (node['data'].get('displayname') or '') if node.get('parent_id') else params.enterprise['enterprise_name'], 'isolated': node.get('restrict_visibility') or False, 'users': [], 'teams': [], @@ -2177,7 +2177,8 @@ def execute(self, params, **kwargs): "node_id": node_id, "encrypted_data": utils.base64_url_encode(crypto.encrypt_aes_v1(data, tree_key)), "visible_below": (kwargs.get('visible_below') == 'on') or False, - "new_user_inherit": (kwargs.get('new_user') == 'on') or False + "new_user_inherit": (kwargs.get('new_user') == 'on') or False, + "role_name": role_name } request_batch.append(rq) @@ -2775,7 +2776,7 @@ def execute(self, params, **kwargs): command = rq.get('command') if command == 'role_add': if rs['result'] == 'success': - logging.info('Role created') + logging.info('%s Role created with Role ID : %s', rq['role_name'], rq['role_id']) else: logging.warning('Failed to create role: %s', rs['message']) else: @@ -3507,7 +3508,10 @@ def execute(self, params, **kwargs): if command in { 'team_add', 'team_delete', 'team_update' }: verb = 'created' if command == 'team_add' else 'deleted' if command == 'team_delete' else 'updated' if rs['result'] == 'success': - logging.info('\'%s\' team is %s', team_name, verb) + if command == 'team_add': + logging.info('\'%s\' team is %s with Team ID: %s', team_name, verb, rq.get('team_uid')) + else: + logging.info('\'%s\' team is %s', team_name, verb) else: logging.warning('\'%s\' team is not %s: %s', team_name, verb, rs['message']) elif command in {'team_enterprise_user_add', 'team_queue_user', 'team_enterprise_user_remove'}: diff --git a/keepercommander/commands/enterprise_common.py b/keepercommander/commands/enterprise_common.py index bab17dc19..acf5663d8 100644 --- a/keepercommander/commands/enterprise_common.py +++ b/keepercommander/commands/enterprise_common.py @@ -27,6 +27,7 @@ def __init__(self): self.public_keys = {} self.team_keys = {} self._node_map = None + self._node_map_enterprise_name = None def execute_args(self, params, args, **kwargs): if params.enterprise: @@ -291,7 +292,9 @@ def get_enterprise_ids(params, num_ids=1): return enterprise_ids def get_node_path(self, params, node_id, omit_root=False): - if self._node_map is None: + current_enterprise_name = params.enterprise.get('enterprise_name') + if self._node_map is None or self._node_map_enterprise_name != current_enterprise_name: + self._node_map_enterprise_name = current_enterprise_name self._node_map = { x['node_id']: (x['data'].get('displayname') or x.get('name') or str(x['node_id']) if x.get('parent_id', 0) > 0 else params.enterprise['enterprise_name'], x.get('parent_id', 0)) for x in params.enterprise['nodes']} diff --git a/keepercommander/commands/pam_debug/info.py b/keepercommander/commands/pam_debug/info.py index 49be65f8d..9f3208ddd 100644 --- a/keepercommander/commands/pam_debug/info.py +++ b/keepercommander/commands/pam_debug/info.py @@ -44,7 +44,7 @@ def execute(self, params: KeeperParams, **kwargs): return if record.record_type not in ["pamUser", "pamMachine", "pamDatabase", "pamDirectory"]: - if re.search(r'^pam.*Configuration$', record.record_type) is None: + if re.search(r'^pam.+Configuration$', record.record_type) is None: print(f"{bcolors.FAIL}The record is a {record.record_type}. This is not a PAM record.{bcolors.ENDC}") return @@ -58,11 +58,13 @@ def execute(self, params: KeeperParams, **kwargs): print(f"{bcolors.WARNING}PAM record does not have protobuf rotation settings, " f"checking all configurations.{bcolors.ENDC}") - configuration_records = list(vault_extensions.find_records(params, "pam.*Configuration")) + # Get all the PAM configuration records in the Vault; configurations are version 6 + configuration_records = GatewayContext.get_configuration_records(params=params) if len(configuration_records) == 0: print(f"{bcolors.FAIL}Cannot find any PAM configuration records in the Vault{bcolors.ENDC}") for configuration_record in configuration_records: + record_link = RecordLink(record=configuration_record, params=params) record_vertex = record_link.dag.get_vertex(record.record_uid) if record_vertex is not None and record_vertex.active is True: diff --git a/keepercommander/commands/pam_debug/rotation_setting.py b/keepercommander/commands/pam_debug/rotation_setting.py index 41e4a034f..8e475b487 100644 --- a/keepercommander/commands/pam_debug/rotation_setting.py +++ b/keepercommander/commands/pam_debug/rotation_setting.py @@ -74,7 +74,7 @@ def execute(self, params: KeeperParams, **kwargs): print(f"{bcolors.FAIL}Configuration record does not exists.{bcolors.ENDC}") return - if re.search(r'^pam.*Configuration$', configuration_record.record_type) is None: + if re.search(r'^pam.+Configuration$', configuration_record.record_type) is None: print( f"{bcolors.FAIL}The configuration record is not a configuration record. " f"It's {configuration_record.record_type} record.{bcolors.ENDC}") diff --git a/keepercommander/commands/pedm/pedm_admin.py b/keepercommander/commands/pedm/pedm_admin.py index d63ec4f93..f029bf93f 100644 --- a/keepercommander/commands/pedm/pedm_admin.py +++ b/keepercommander/commands/pedm/pedm_admin.py @@ -205,8 +205,8 @@ def __init__(self): ad_parser.add_argument('--ad-user', dest='ad_user', required=True, help='AD bind user (DOMAIN\\username or DN)') ad_parser.add_argument('--ad-password', dest='ad_password', help='AD password') ad_parser.add_argument('--group', dest='groups', action='append', help='AD group name or DN (repeatable)') - ad_parser.add_argument('--netbios-domain', dest='use_netbios_domain', action='store_true', - help='Use NetBIOS domain names (e.g., TEST) instead of DNS names (e.g., test.local)') + ad_parser.add_argument('--ad-domain', dest='ad_domain', action='store', choices=['netbios', 'dns'], + help='Use NetBIOS domain names (e.g., TEST) or DNS names (e.g., test.local)') for subparser in subparsers.choices.values(): subparser.exit = base.suppress_exit @@ -311,18 +311,14 @@ def execute(self, context: KeeperParams, **kwargs): if groups: kwargs['groups'] = groups - custom_field = config_record.get_typed_field(field_type=None, label='NetBIOS Domain') + custom_field = config_record.get_typed_field(field_type=None, label='AD Domain') if custom_field: - netbios_domain = custom_field.get_default_value(bool) - if netbios_domain is None: - custom_value = custom_field.get_default_value(str) - if custom_value: - try: - netbios_domain = bool(custom_value) - except: - pass + netbios_domain = custom_field.get_default_value(str) + if netbios_domain: + if netbios_domain not in ('netbios', 'dns'): + netbios_domain = None if netbios_domain is True: - kwargs['use_netbios_domain'] = netbios_domain + kwargs['ad_domain'] = netbios_domain else: raise base.CommandError(f'Record "{config_record.title}" does not contain either "Azure Tenant ID" or "AD URL" value') @@ -332,10 +328,11 @@ def execute(self, context: KeeperParams, **kwargs): ad_user = kwargs.get('ad_user') ad_password = kwargs.get('ad_password') scim_groups = kwargs.get('groups') - use_netbios_domain = kwargs.get('use_netbios_domain', False) + ad_domain = kwargs.get('ad_domain') or 'netbios' if scim_groups and not isinstance(scim_groups, list): scim_groups = None + use_netbios_domain = ad_domain != 'dns' if not ad_url or not ad_user: raise base.CommandError('AD source requires AD URL and AD User') try: diff --git a/keepercommander/commands/record.py b/keepercommander/commands/record.py index c7a901f0c..6fcc91329 100644 --- a/keepercommander/commands/record.py +++ b/keepercommander/commands/record.py @@ -122,12 +122,14 @@ def register_command_info(aliases, command_info): get_info_parser.add_argument('uid', type=str, action='store', help='UID or title to search for') -search_parser = argparse.ArgumentParser(prog='search', description='Search the vault using a regular expression') -search_parser.add_argument('pattern', nargs='?', type=str, action='store', help='search pattern') +search_parser = argparse.ArgumentParser(prog='search', description='Search the vault. Words can be in any order.') +search_parser.add_argument('pattern', nargs='*', type=str, action='store', help='search terms (space-separated, order independent)') search_parser.add_argument('-v', '--verbose', dest='verbose', action='store_true', help='verbose output') search_parser.add_argument('-c', '--categories', dest='categories', action='store', help='One or more of these letters for categories to search: "r" = records, ' '"s" = shared folders, "t" = teams') +search_parser.add_argument('--regex', dest='regex', action='store_true', + help='treat pattern as a regular expression instead of space-separated search terms') search_parser.add_argument('--format', dest='format', action='store', choices=['table', 'json'], default='table', help='output format') @@ -1114,9 +1116,17 @@ def get_parser(self): return search_parser def execute(self, params, **kwargs): - pattern = kwargs.get('pattern') or '' + pattern_args = kwargs.get('pattern') or [] + # Join multiple words into a single pattern string + pattern = ' '.join(pattern_args) if isinstance(pattern_args, list) else (pattern_args or '') + use_regex = kwargs.get('regex', False) + + # Handle wildcard: '*' means match all if pattern == '*': - pattern = '.*' + if use_regex: + pattern = '.*' + else: + pattern = '' # Empty pattern matches all in token mode categories = (kwargs.get('categories') or 'rst').lower() verbose = kwargs.get('verbose') is True @@ -1127,7 +1137,7 @@ def execute(self, params, **kwargs): if 'r' in categories: - records = list(vault_extensions.find_records(params, pattern, record_version=None if verbose else [2,3])) + records = list(vault_extensions.find_records(params, pattern, record_version=None if verbose else [2,3], use_regex=use_regex)) if records: if fmt == 'json': for record in records: @@ -1157,7 +1167,7 @@ def execute(self, params, **kwargs): # Search shared folders if 's' in categories: - results = api.search_shared_folders(params, pattern) + results = api.search_shared_folders(params, pattern, use_regex=use_regex) if results: if fmt == 'json': for sf in results: @@ -1175,7 +1185,7 @@ def execute(self, params, **kwargs): # Search teams if 't' in categories: - results = api.search_teams(params, pattern) + results = api.search_teams(params, pattern, use_regex=use_regex) if results: if fmt == 'json': for team in results: diff --git a/keepercommander/commands/supershell/handlers/keyboard.py b/keepercommander/commands/supershell/handlers/keyboard.py index 6ef3d03e1..f83394827 100644 --- a/keepercommander/commands/supershell/handlers/keyboard.py +++ b/keepercommander/commands/supershell/handlers/keyboard.py @@ -10,6 +10,25 @@ from rich.markup import escape as rich_escape +# Debug logging - writes to /tmp/supershell_debug.log when enabled +DEBUG_EVENTS = False +_debug_log_file = None + +def _debug_log(msg: str): + """Log debug message to /tmp/supershell_debug.log if DEBUG_EVENTS is True.""" + if not DEBUG_EVENTS: + return + global _debug_log_file + try: + if _debug_log_file is None: + _debug_log_file = open('/tmp/supershell_debug.log', 'a') + import datetime + timestamp = datetime.datetime.now().strftime('%H:%M:%S.%f')[:-3] + _debug_log_file.write(f"[{timestamp}] {msg}\n") + _debug_log_file.flush() + except Exception: + pass + if TYPE_CHECKING: from textual.events import Key from textual.widgets import Tree @@ -141,117 +160,230 @@ def handle(self, event: 'Key', app: 'SuperShellApp') -> bool: class ShellInputHandler(KeyHandler): - """Handles input when shell pane is visible and active.""" + """Placeholder for shell input key handling. + + Note: All key handling is now done by ShellInputTextArea itself, + including Tab/Shift+Tab focus cycling. This handler is kept for + potential future use but currently does not handle any keys. + """ def can_handle(self, event: 'Key', app: 'SuperShellApp') -> bool: - # Handle Enter key whenever shell pane is visible (even if not actively focused on input) - if app.shell_pane_visible and event.key == "enter": - return True - return app.shell_pane_visible and app.shell_input_active + # ShellInputTextArea handles all keys directly + return False def handle(self, event: 'Key', app: 'SuperShellApp') -> bool: - tree = app.query_one("#folder_tree") + return False - if event.key == "ctrl+d": - app._close_shell_pane() - self._stop_event(event) - return True - if event.key == "enter": - # Ensure shell input is active when Enter is pressed - if not app.shell_input_active: - app.shell_input_active = True - app._update_shell_input_display() - # Execute command (even if empty, to show new prompt) - app._execute_shell_command(app.shell_input_text) - app.shell_input_text = "" - app._update_shell_input_display() +class ShellPaneCloseHandler(KeyHandler): + """Handles Ctrl+D to close shell when not focused on shell output (which uses Ctrl+D for page down).""" + + def can_handle(self, event: 'Key', app: 'SuperShellApp') -> bool: + if not (app.shell_pane_visible and event.key == "ctrl+d"): + return False + # Don't intercept Ctrl+D when shell output has focus (it's used for page down there) + try: + from textual.widgets import TextArea + shell_output = app.query_one("#shell_output_content", TextArea) + if shell_output.has_focus: + return False + except Exception: + pass + return True + + def handle(self, event: 'Key', app: 'SuperShellApp') -> bool: + app._close_shell_pane() + self._stop_event(event) + return True + + +class ShellCopyHandler(KeyHandler): + """Handles Ctrl+C/Cmd+C to copy selected text, Ctrl+Shift+C/Cmd+Shift+C to copy all shell output.""" + + # Keys that trigger copy selected text + COPY_KEYS = ("ctrl+c", "cmd+c") + # Keys that trigger copy all output + COPY_ALL_KEYS = ("ctrl+shift+c", "cmd+shift+c") + + def can_handle(self, event: 'Key', app: 'SuperShellApp') -> bool: + all_copy_keys = self.COPY_KEYS + self.COPY_ALL_KEYS + result = app.shell_pane_visible and event.key in all_copy_keys + _debug_log(f"ShellCopyHandler.can_handle: shell_visible={app.shell_pane_visible} " + f"key={event.key!r} result={result}") + return result + + def handle(self, event: 'Key', app: 'SuperShellApp') -> bool: + _debug_log(f"ShellCopyHandler.handle: key={event.key!r}") + import pyperclip + + # Ctrl+C or Cmd+C: Copy selected text from TextArea + if event.key in self.COPY_KEYS: + try: + from textual.widgets import TextArea + shell_output = app.query_one("#shell_output_content", TextArea) + selected = shell_output.selected_text + _debug_log(f"ShellCopyHandler.handle: selected_text={selected!r}") + + if selected and selected.strip(): + pyperclip.copy(selected) + preview = selected[:40] + ('...' if len(selected) > 40 else '') + preview = preview.replace('\n', ' ') + app.notify(f"Copied: {preview}", severity="information") + _debug_log(f"ShellCopyHandler.handle: Copied selected text") + self._stop_event(event) + return True + else: + _debug_log(f"ShellCopyHandler.handle: No text selected, not handling") + return False # Let event propagate if nothing selected + except Exception as e: + _debug_log(f"ShellCopyHandler.handle: Error getting selection: {e}") + return False + + # Ctrl+Shift+C or Cmd+Shift+C: Copy all shell output + if event.key in self.COPY_ALL_KEYS: + import re + lines = [] + _debug_log(f"ShellCopyHandler.handle: shell_history has {len(app.shell_history)} entries") + for cmd, output in app.shell_history: + lines.append(f"> {cmd}") + if output.strip(): + lines.append(output) + lines.append("") + + raw_text = '\n'.join(lines) + clean_text = re.sub(r'\x1b\[[0-9;]*m', '', raw_text) + clean_text = re.sub(r'\[[^\]]*\]', '', clean_text) + + if clean_text.strip(): + try: + pyperclip.copy(clean_text.strip()) + app.notify("All shell output copied", severity="information") + _debug_log(f"ShellCopyHandler.handle: Copied all output") + except Exception as e: + _debug_log(f"ShellCopyHandler.handle: Copy failed: {e}") + app.notify("Copy failed", severity="warning") + else: + app.notify("No output to copy", severity="information") + self._stop_event(event) return True - if event.key == "backspace": - if app.shell_input_text: - app.shell_input_text = app.shell_input_text[:-1] - app._update_shell_input_display() + return False + + +class ShellOutputHandler(KeyHandler): + """Handles keyboard events when shell output pane has focus. + + Provides vim-style scrolling (j/k, Ctrl+d/u) and Tab cycling + when the terminal output pane is focused. + """ + + def can_handle(self, event: 'Key', app: 'SuperShellApp') -> bool: + if not app.shell_pane_visible: + return False + try: + from textual.widgets import TextArea + shell_output = app.query_one("#shell_output_content", TextArea) + return shell_output.has_focus + except Exception: + return False + + def handle(self, event: 'Key', app: 'SuperShellApp') -> bool: + from textual.widgets import Tree, TextArea + from textual.containers import VerticalScroll + + try: + shell_output = app.query_one("#shell_output_content", TextArea) + except Exception: + return False + + # Tab cycles to Shell Input + if event.key == "tab": + app.shell_input_active = True + try: + shell_input = app.query_one("#shell_input_area") + shell_input.focus() + except Exception: + pass + app._update_status("Shell input | Tab to search | Shift+Tab to output") self._stop_event(event) return True - if event.key == "ctrl+u": - # Ctrl+U clears the input line (like bash) - app.shell_input_text = "" - app._update_shell_input_display() + # Shift+Tab cycles to Detail pane + if event.key == "shift+tab": + detail_scroll = app.query_one("#record_detail", VerticalScroll) + detail_scroll.focus() + app._update_status("Detail pane | Tab to shell output | Shift+Tab to tree") self._stop_event(event) return True + # Escape goes back to tree if event.key == "escape": - app.shell_input_active = False + tree = app.query_one("#folder_tree", Tree) tree.focus() - app._update_shell_input_display() - app._update_status("Shell open | Tab to cycle | press Enter in shell to run commands") + app._update_status("Navigate with j/k | Tab to detail | ? for help") self._stop_event(event) return True - if event.key == "tab": - # Shell → Search input - app.shell_input_active = False - app._update_shell_input_display() - app.search_input_active = True - tree.add_class("search-input-active") - app._update_search_display(perform_search=False) - app._update_status("Type to search | Tab to tree | Ctrl+U to clear") + # Vim-style scrolling: j = down, k = up + if event.key == "j" or event.key == "down": + shell_output.scroll_relative(y=1) self._stop_event(event) return True - if event.key == "shift+tab": - # Shell → Detail pane - from textual.containers import VerticalScroll - app.shell_input_active = False - app._update_shell_input_display() - detail_scroll = app.query_one("#record_detail", VerticalScroll) - detail_scroll.focus() - app._update_status("Detail pane | Tab to shell | Shift+Tab to tree") + if event.key == "k" or event.key == "up": + shell_output.scroll_relative(y=-1) self._stop_event(event) return True - if event.key == "up": - if app.shell_command_history: - if app.shell_history_index < len(app.shell_command_history) - 1: - app.shell_history_index += 1 - app.shell_input_text = app.shell_command_history[-(app.shell_history_index + 1)] - app._update_shell_input_display() + # Ctrl+D = half page down + if event.key == "ctrl+d": + shell_output.scroll_relative(y=10) self._stop_event(event) return True - if event.key == "down": - if app.shell_history_index > 0: - app.shell_history_index -= 1 - app.shell_input_text = app.shell_command_history[-(app.shell_history_index + 1)] - elif app.shell_history_index == 0: - app.shell_history_index = -1 - app.shell_input_text = "" - app._update_shell_input_display() + # Ctrl+U = half page up + if event.key == "ctrl+u": + shell_output.scroll_relative(y=-10) self._stop_event(event) return True - if event.character and event.character.isprintable(): - app.shell_input_text += event.character - app._update_shell_input_display() + # Ctrl+F = full page down (vim) + if event.key == "ctrl+f": + shell_output.scroll_relative(y=20) self._stop_event(event) return True - return False + # Ctrl+B = full page up (vim) + if event.key == "ctrl+b": + shell_output.scroll_relative(y=-20) + self._stop_event(event) + return True + # Ctrl+E = scroll down one line (vim) + if event.key == "ctrl+e": + shell_output.scroll_relative(y=1) + self._stop_event(event) + return True -class ShellPaneCloseHandler(KeyHandler): - """Handles Ctrl+D to close shell even when not focused on input.""" + # Ctrl+Y = scroll up one line (vim) + if event.key == "ctrl+y": + shell_output.scroll_relative(y=-1) + self._stop_event(event) + return True - def can_handle(self, event: 'Key', app: 'SuperShellApp') -> bool: - return app.shell_pane_visible and event.key == "ctrl+d" + # g = go to top, G = go to bottom + if event.key == "g": + shell_output.scroll_home() + self._stop_event(event) + return True - def handle(self, event: 'Key', app: 'SuperShellApp') -> bool: - app._close_shell_pane() - self._stop_event(event) - return True + if event.character == "G": + shell_output.scroll_end() + self._stop_event(event) + return True + + return False class SearchInputTabHandler(KeyHandler): @@ -270,6 +402,8 @@ def handle(self, event: 'Key', app: 'SuperShellApp') -> bool: app.search_input_active = False tree.remove_class("search-input-active") + search_bar = app.query_one("#search_bar") + search_bar.remove_class("search-active") if app.search_input_text: search_display.update(rich_escape(app.search_input_text)) @@ -281,11 +415,15 @@ def handle(self, event: 'Key', app: 'SuperShellApp') -> bool: tree.focus() app._update_status("Navigate with j/k | Tab to detail | ? for help") else: - # Shift+Tab: Search input → Shell (if visible) or Detail pane + # Shift+Tab: Search input → Shell Input (if visible) or Detail pane if app.shell_pane_visible: app.shell_input_active = True - app._update_shell_input_display() - app._update_status("Shell | Shift+Tab to detail | Tab to search") + try: + shell_input = app.query_one("#shell_input_area") + shell_input.focus() + except Exception: + pass + app._update_status("Shell input | Tab to search | Shift+Tab to output") else: detail_scroll.focus() app._update_status("Detail pane | Tab to search | Shift+Tab to tree") @@ -310,14 +448,20 @@ def handle(self, event: 'Key', app: 'SuperShellApp') -> bool: detail_scroll = app.query_one("#record_detail", VerticalScroll) if event.key == "tab": - # Detail pane → Shell (if visible) or Search input + # Detail pane → Shell Output (if visible) or Search input if app.shell_pane_visible: - app.shell_input_active = True - app._update_shell_input_display() - app._update_status("Shell | Tab to search | Shift+Tab to detail") + try: + from textual.widgets import TextArea + shell_output = app.query_one("#shell_output_content", TextArea) + shell_output.focus() + except Exception: + pass + app._update_status("Shell output | j/k to scroll | Tab to input | Shift+Tab to detail") else: app.search_input_active = True tree.add_class("search-input-active") + search_bar = app.query_one("#search_bar") + search_bar.add_class("search-active") app._update_search_display(perform_search=False) app._update_status("Type to search | Tab to tree | Ctrl+U to clear") self._stop_event(event) @@ -410,7 +554,10 @@ def handle(self, event: 'Key', app: 'SuperShellApp') -> bool: # Tab switches to detail pane if event.key == "tab": detail_scroll.focus() - app._update_status("Detail pane | Tab to search | Shift+Tab to tree") + if app.shell_pane_visible: + app._update_status("Detail pane | Tab to shell | Shift+Tab to tree") + else: + app._update_status("Detail pane | Tab to search | Shift+Tab to tree") self._stop_event(event) return True @@ -418,6 +565,8 @@ def handle(self, event: 'Key', app: 'SuperShellApp') -> bool: if event.key == "shift+tab": app.search_input_active = True tree.add_class("search-input-active") + search_bar = app.query_one("#search_bar") + search_bar.add_class("search-active") app._update_search_display(perform_search=False) app._update_status("Type to search | Tab to tree | Ctrl+U to clear") self._stop_event(event) @@ -427,6 +576,8 @@ def handle(self, event: 'Key', app: 'SuperShellApp') -> bool: if event.key == "slash": app.search_input_active = True tree.add_class("search-input-active") + search_bar = app.query_one("#search_bar") + search_bar.add_class("search-active") app._update_search_display(perform_search=False) self._stop_event(event) return True @@ -459,6 +610,8 @@ def handle(self, event: 'Key', app: 'SuperShellApp') -> bool: if event.key == "slash" and not app.search_input_active: app.search_input_active = True tree.add_class("search-input-active") + search_bar = app.query_one("#search_bar") + search_bar.add_class("search-active") app._update_search_display(perform_search=False) self._stop_event(event) return True @@ -468,6 +621,8 @@ def handle(self, event: 'Key', app: 'SuperShellApp') -> bool: app.search_input_text = "" app.search_input_active = False tree.remove_class("search-input-active") + search_bar = app.query_one("#search_bar") + search_bar.remove_class("search-active") app._perform_live_search("") search_display = app.query_one("#search_display") @@ -489,6 +644,8 @@ def handle(self, event: 'Key', app: 'SuperShellApp') -> bool: # Move focus to tree to navigate results (only when typing in search) app.search_input_active = False tree.remove_class("search-input-active") + search_bar = app.query_one("#search_bar") + search_bar.remove_class("search-active") search_display = app.query_one("#search_display") if app.search_input_text: @@ -503,7 +660,7 @@ def handle(self, event: 'Key', app: 'SuperShellApp') -> bool: self._stop_event(event) return True - if event.key == "backspace": + if event.key == "backspace" and app.search_input_active: if app.search_input_text: app.search_input_text = app.search_input_text[:-1] app._update_search_display() @@ -585,6 +742,8 @@ def __init__(self): CommandModeHandler(), ShellInputHandler(), ShellPaneCloseHandler(), + ShellCopyHandler(), + ShellOutputHandler(), # Tab cycling handlers SearchInputTabHandler(), @@ -610,9 +769,14 @@ def dispatch(self, event: 'Key', app: 'SuperShellApp') -> bool: True if the event was handled """ for handler in self.handlers: - if handler.can_handle(event, app): - if handler.handle(event, app): + can_handle = handler.can_handle(event, app) + if can_handle: + _debug_log(f"DISPATCH: {handler.__class__.__name__} can_handle=True for key={event.key!r}") + result = handler.handle(event, app) + _debug_log(f"DISPATCH: {handler.__class__.__name__} handle returned {result}") + if result: return True + _debug_log(f"DISPATCH: No handler for key={event.key!r}") return False diff --git a/keepercommander/commands/supershell/screens/help.py b/keepercommander/commands/supershell/screens/help.py index 0e13b95b0..2261c3a62 100644 --- a/keepercommander/commands/supershell/screens/help.py +++ b/keepercommander/commands/supershell/screens/help.py @@ -17,13 +17,26 @@ class HelpScreen(ModalScreen): DEFAULT_CSS = """ HelpScreen { align: center middle; + background: rgba(0, 0, 0, 0.8); + } + + HelpScreen > Vertical { + background: #000000; + } + + HelpScreen > Vertical > Horizontal { + background: #000000; + } + + HelpScreen Static { + background: #000000; } #help_container { width: 90; height: auto; max-height: 90%; - background: #111111; + background: #000000; border: solid #444444; padding: 1 2; } @@ -81,7 +94,7 @@ def compose(self) -> ComposeResult: Up/Down Command history quit/q Close shell pane Ctrl+D Close shell pane - Shift+drag Select text (native) + Select text Auto-copies to clipboard [green]General:[/green] ? Help diff --git a/keepercommander/service/commands/slack_app_setup.py b/keepercommander/service/commands/slack_app_setup.py index e8d15ff93..0b3f26d8a 100644 --- a/keepercommander/service/commands/slack_app_setup.py +++ b/keepercommander/service/commands/slack_app_setup.py @@ -155,7 +155,7 @@ def _get_slack_service_configuration(self) -> ServiceConfig: return ServiceConfig( port=port, - commands='search,share-record,share-folder,record-add,one-time-share,pedm,device-approve,get', + commands='search,share-record,share-folder,record-add,one-time-share,pedm,device-approve,get,server', queue_enabled=True, # Always enable queue mode (v2 API) ngrok_enabled=ngrok_config['ngrok_enabled'], ngrok_auth_token=ngrok_config['ngrok_auth_token'], diff --git a/keepercommander/service/config/file_handler.py b/keepercommander/service/config/file_handler.py index 957740b58..e7b4a798a 100644 --- a/keepercommander/service/config/file_handler.py +++ b/keepercommander/service/config/file_handler.py @@ -50,7 +50,7 @@ def get_config_format(self, save_type: str = None) -> str: from ..core.globals import get_current_params if params := get_current_params(): - if self.cli_handler.download_config_from_vault(params, 'Commander Service Mode', self.config_dir): + if self.cli_handler.download_config_from_vault(params, 'Commander Service Mode Config', self.config_dir): if json_path.exists(): self.encrypt_config_file(json_path, self.config_dir) return 'json' diff --git a/keepercommander/service/util/parse_keeper_response.py b/keepercommander/service/util/parse_keeper_response.py index 0ffdfc97d..a75c9067a 100644 --- a/keepercommander/service/util/parse_keeper_response.py +++ b/keepercommander/service/util/parse_keeper_response.py @@ -511,7 +511,7 @@ def _parse_mkdir_command(response: str) -> Dict[str, Any]: @staticmethod def _parse_record_add_command(response: str) -> Dict[str, Any]: - """Parse 'record-add' command output to extract record UID or handle errors.""" + """Parse 'record-add' command output to extract record UID, share URL, or handle errors.""" response_str = response.strip() # Check for error messages first @@ -533,14 +533,19 @@ def _parse_record_add_command(response: str) -> Dict[str, Any]: "data": None } - # Success case - try to extract UID + # Success case - check if it's a share URL (from --self-destruct) result = { "status": "success", "command": "record-add", "data": None } - if re.match(r'^[a-zA-Z0-9_-]+$', response_str): + # Check if response is a share URL (starts with https:// and contains /vault/share) + if response_str.startswith('https://') and '/vault/share' in response_str: + result["data"] = { + "share_url": response_str + } + elif re.match(r'^[a-zA-Z0-9_-]+$', response_str): result["data"] = { "record_uid": response_str } diff --git a/keepercommander/service/util/tunneling.py b/keepercommander/service/util/tunneling.py index 5c650de9f..afc8c95d7 100644 --- a/keepercommander/service/util/tunneling.py +++ b/keepercommander/service/util/tunneling.py @@ -185,6 +185,13 @@ def generate_ngrok_url(port, auth_token, ngrok_custom_domain, run_mode): if not port or not auth_token: raise ValueError("Both 'port' and 'ngrok_auth_token' must be provided.") + # Strip .ngrok.io suffix if user provided full domain (e.g., "mycompany.ngrok.io" -> "mycompany") + if ngrok_custom_domain: + for suffix in ['.ngrok.io', '.ngrok.app', '.ngrok-free.app']: + if ngrok_custom_domain.lower().endswith(suffix): + ngrok_custom_domain = ngrok_custom_domain[:-len(suffix)] + break + logging.getLogger("ngrok").setLevel(logging.CRITICAL) logging.getLogger("pyngrok").setLevel(logging.CRITICAL) diff --git a/keepercommander/vault_extensions.py b/keepercommander/vault_extensions.py index 118dae863..e974755cd 100644 --- a/keepercommander/vault_extensions.py +++ b/keepercommander/vault_extensions.py @@ -32,6 +32,73 @@ def _match_value(pattern, value): # type: (Callable[[str], Any], Any) -> bool return False +def _collect_searchable_text(value): # type: (Any) -> str + """Recursively collect all string values from a value (str, list, or dict).""" + if isinstance(value, str): + return value + elif isinstance(value, list): + parts = [] + for v in value: + parts.append(_collect_searchable_text(v)) + return ' '.join(parts) + elif isinstance(value, dict): + parts = [] + for v in value.values(): + parts.append(_collect_searchable_text(v)) + return ' '.join(parts) + return '' + + +def matches_record_tokens(record, tokens, params=None): + # type: (vault.KeeperRecord, list, Optional[KeeperParams]) -> bool + """ + Token-based search matching (SuperShell-style). + + All tokens must match somewhere in the record's searchable text. + Order doesn't matter: "aws prod" matches "production aws server". + + Searches: record UID, title, all field keys and values, and folder name. + """ + if not tokens: + return True + + # Build searchable text from all record fields + parts = [] + + # Record UID + parts.append(record.record_uid) + + # Title + if record.title: + parts.append(record.title) + + # All fields (keys and values) + for key, value in record.enumerate_fields(): + # Strip field type prefix like "(login)." + m = re.match(r'^\((\w+)\)\.?', key) + if m: + key = m.group(1) + if key: + parts.append(key) + if value: + parts.append(_collect_searchable_text(value)) + + # Get folder name if params provided + if params and hasattr(params, 'record_cache') and hasattr(params, 'folder_cache'): + # Find folder containing this record + for folder_uid, folder in params.folder_cache.items(): + if hasattr(folder, 'records') and record.record_uid in folder.records: + if hasattr(folder, 'name') and folder.name: + parts.append(folder.name) + break + + # Combine and lowercase for matching + combined_text = ' '.join(parts).lower() + + # All tokens must match somewhere + return all(token in combined_text for token in tokens) + + def matches_record(record, pattern, search_fields=None): # type: (vault.KeeperRecord, Union[str, Callable[[str], Any]], Optional[Iterable[str]]) -> bool if isinstance(pattern, str): pattern = re.compile(pattern, re.IGNORECASE).search @@ -56,9 +123,33 @@ def find_records(params, # type: KeeperParams search_str=None, # type: Optional[str] record_type=None, # type: Union[str, Iterable[str], None] record_version=None, # type: Union[int, Iterable[int], None] - search_fields=None # type: Optional[Iterable[str]] + search_fields=None, # type: Optional[Iterable[str]] + use_regex=False # type: bool ): # type: (...) -> Iterator[vault.KeeperRecord] - pattern = re.compile(search_str, re.IGNORECASE).search if search_str else None + """ + Search records in the vault. + + Args: + params: KeeperParams with record_cache + search_str: Search string (tokens or regex depending on use_regex) + record_type: Filter by record type(s) + record_version: Filter by record version(s) + search_fields: Limit search to specific fields (regex mode only) + use_regex: If True, treat search_str as regex. If False (default), + treat as space-separated tokens where all must match (any order). + """ + # Token-based search (default): split by whitespace, all tokens must match + search_tokens = None + pattern = None + + if search_str: + if use_regex: + pattern = re.compile(search_str, re.IGNORECASE).search + else: + # Token-based: split and lowercase + search_tokens = [t.lower() for t in search_str.split() if t.strip()] + if not search_tokens: + search_tokens = None type_filter = None # type: Optional[Set[str]] version_filter = None # type: Optional[Set[int]] @@ -88,7 +179,21 @@ def find_records(params, # type: KeeperParams if type_filter and record.record_type not in type_filter: continue - if pattern: + if search_tokens: + # Token-based search (default) + is_match = matches_record_tokens(record, search_tokens, params) + # Also check file attachments + if not is_match and isinstance(record, vault.TypedRecord): + field = record.get_typed_field('fileRef') + if field and isinstance(field.value, list): + for file_uid in field.value: + file_record = vault.KeeperRecord.load(params, file_uid) + if isinstance(file_record, vault.FileRecord): + is_match = matches_record_tokens(file_record, search_tokens, params) + if is_match: + break + elif pattern: + # Regex-based search (legacy, use --regex flag) is_match = matches_record(record, pattern, search_fields) if not is_match and isinstance(record, vault.TypedRecord): field = record.get_typed_field('fileRef') diff --git a/unit-tests/test_api.py b/unit-tests/test_api.py index a7eb40fff..fca96387f 100644 --- a/unit-tests/test_api.py +++ b/unit-tests/test_api.py @@ -49,7 +49,7 @@ def test_search_shared_folders(self): params = get_synced_params() sfs = api.search_shared_folders(params, '') - self.assertEqual(len(sfs), len(params.shared_folder_cache)) + self.assertEqual(len(sfs), 0) sfs = api.search_shared_folders(params, 'folder') self.assertEqual(len(sfs), len(params.shared_folder_cache)) @@ -64,7 +64,7 @@ def test_search_teams(self): params = get_synced_params() teams = api.search_teams(params, '') - self.assertEqual(len(teams), len(params.team_cache)) + self.assertEqual(len(teams), 0) teams = api.search_shared_folders(params, 'team') self.assertEqual(len(teams), len(params.shared_folder_cache))