diff --git a/keepercommander/__init__.py b/keepercommander/__init__.py
index e5ba75178..ba03096e4 100644
--- a/keepercommander/__init__.py
+++ b/keepercommander/__init__.py
@@ -6,8 +6,8 @@
# |_|
#
# Keeper Commander
-# Copyright 2023 Keeper Security Inc.
-# Contact: ops@keepersecurity.com
+# Copyright 2009-2026 Keeper Security Inc.
+# Contact: commander@keepersecurity.com
#
-__version__ = '17.2.6'
+__version__ = '17.2.7'
diff --git a/keepercommander/cli.py b/keepercommander/cli.py
index 0f374222b..05ba08aa9 100644
--- a/keepercommander/cli.py
+++ b/keepercommander/cli.py
@@ -74,7 +74,12 @@ def display_command_help(show_enterprise=False, show_shell=False, show_legacy=Fa
from colorama import Fore, Style
import shutil
- alias_lookup = {x[1]: x[0] for x in aliases.items()}
+ # Build a lookup of command -> list of aliases
+ alias_lookup = {}
+ for alias, command in aliases.items():
+ if command not in alias_lookup:
+ alias_lookup[command] = []
+ alias_lookup[command].append(alias)
DIM = Fore.WHITE # Use white for better readability (not too bright, not too dim)
# Get terminal width
@@ -144,8 +149,8 @@ def clean_description(desc):
else:
commands_in_category = sorted(categorized_commands[category], key=lambda x: x[0])
for cmd, description in commands_in_category:
- alias = alias_lookup.get(cmd) or ''
- alias_str = f' ({alias})' if alias else ''
+ aliases_list = alias_lookup.get(cmd) or []
+ alias_str = f' ({", ".join(sorted(aliases_list))})' if aliases_list else ''
cmd_display = f'{cmd}{alias_str}'
all_cmd_displays.append((category, cmd_display, description))
global_max_width = max(global_max_width, len(cmd_display))
diff --git a/keepercommander/commands/_supershell_impl.py b/keepercommander/commands/_supershell_impl.py
index 8ddaef6ba..7c0676c68 100644
--- a/keepercommander/commands/_supershell_impl.py
+++ b/keepercommander/commands/_supershell_impl.py
@@ -4201,7 +4201,7 @@ def _update_shell_header(self):
f"[{t['text_dim']}] (Enter to run | Up/Down for history | Ctrl+D to close)[/{t['text_dim']}]"
)
- def check_action(self, action: str, parameters: tuple) -> bool | None:
+ def check_action(self, action: str, parameters: tuple) -> Optional[bool]:
"""Control whether actions are enabled based on search state"""
# When search input is active, disable all bindings except escape and search
# This allows keys to be captured as text input instead of triggering actions
diff --git a/keepercommander/commands/discoveryrotation.py b/keepercommander/commands/discoveryrotation.py
index 40b989875..11802a5a4 100644
--- a/keepercommander/commands/discoveryrotation.py
+++ b/keepercommander/commands/discoveryrotation.py
@@ -2187,6 +2187,10 @@ class PAMConfigurationNewCommand(Command, PamConfigurationEditMixin):
help='Set recording connections permissions for the resource')
parser.add_argument('--typescript-recording', '-tr', dest='typescriptrecording', choices=choices,
help='Set TypeScript recording permissions for the resource')
+ parser.add_argument('--ai-threat-detection', dest='ai_threat_detection', choices=choices,
+ help='Set AI threat detection permissions')
+ parser.add_argument('--ai-terminate-session-on-detection', dest='ai_terminate_session_on_detection', choices=choices,
+ help='Set AI session termination on threat detection permissions')
def __init__(self):
super().__init__()
@@ -2274,7 +2278,9 @@ def execute(self, params, **kwargs):
kwargs.get('rotation'),
kwargs.get('recording'),
kwargs.get('typescriptrecording'),
- kwargs.get('remotebrowserisolation')
+ kwargs.get('remotebrowserisolation'),
+ kwargs.get('ai_threat_detection'),
+ kwargs.get('ai_terminate_session_on_detection')
)
if admin_cred_ref:
tmp_dag.link_user_to_config_with_options(admin_cred_ref, is_admin='on')
diff --git a/keepercommander/commands/pam_debug/info.py b/keepercommander/commands/pam_debug/info.py
index 9f3208ddd..11d1e8cac 100644
--- a/keepercommander/commands/pam_debug/info.py
+++ b/keepercommander/commands/pam_debug/info.py
@@ -505,31 +505,42 @@ def _print_field(f):
print(f" {self._b('Provider Group')}: {content.item.provider_group}")
print(f" {self._b('Allows Admin')}: {content.item.allows_admin}")
print(f" {self._b('Admin Reason')}: {content.item.admin_reason}")
+ else:
+ for k, v in content.item:
+ print(f" {self._b(k)}: {v}")
- print("")
- print(self._h("Belongs To Vertices (Parents)"))
- vertices = discovery_vertex.belongs_to_vertices()
- for vertex in vertices:
- content = DiscoveryObject.get_discovery_object(vertex)
- print(f" * {content.description} ({vertex.uid})")
- for edge_type in [EdgeType.LINK, EdgeType.ACL, EdgeType.KEY, EdgeType.DELETION]:
- edge = discovery_vertex.get_edge(vertex, edge_type=edge_type)
- if edge is not None:
- print(f" . {edge_type}, active: {edge.active}")
-
- if len(vertices) == 0:
- print(f"{bcolors.FAIL} Does not belong to anyone{bcolors.ENDC}")
+ # Configuration records do not belong to other record; don't show.
+ if record.version != 6:
+ print("")
+ print(self._h("Belongs To Vertices (Parents)"))
+ vertices = discovery_vertex.belongs_to_vertices()
+ for vertex in vertices:
+ try:
+ content = DiscoveryObject.get_discovery_object(vertex)
+ print(f" * {content.description} ({vertex.uid})")
+ for edge_type in [EdgeType.LINK, EdgeType.ACL, EdgeType.KEY, EdgeType.DELETION]:
+ edge = discovery_vertex.get_edge(vertex, edge_type=edge_type)
+ if edge is not None:
+ print(f" . {edge_type}, active: {edge.active}")
+ except Exception as err:
+ print(f"{bcolors.FAIL}Could not get belongs to information: {err}{bcolors.ENDC}")
+
+ if len(vertices) == 0:
+ print(f"{bcolors.FAIL} Does not belong to anyone{bcolors.ENDC}")
print("")
print(f"{bcolors.HEADER}Vertices Belonging To (Children){bcolors.ENDC}")
vertices = discovery_vertex.has_vertices()
for vertex in vertices:
- content = DiscoveryObject.get_discovery_object(vertex)
- print(f" * {content.description} ({vertex.uid})")
- for edge_type in [EdgeType.LINK, EdgeType.ACL, EdgeType.KEY, EdgeType.DELETION]:
- edge = vertex.get_edge(discovery_vertex, edge_type=edge_type)
- if edge is not None:
- print(f" . {edge_type}, active: {edge.active}")
+ try:
+ content = DiscoveryObject.get_discovery_object(vertex)
+ print(f" * {content.description} ({vertex.uid})")
+ for edge_type in [EdgeType.LINK, EdgeType.ACL, EdgeType.KEY, EdgeType.DELETION]:
+ edge = vertex.get_edge(discovery_vertex, edge_type=edge_type)
+ if edge is not None:
+ print(f" . {edge_type}, active: {edge.active}")
+ except Exception as err:
+ print(f"{bcolors.FAIL}Could not get belonging to information: {err}{bcolors.ENDC}")
if len(vertices) == 0:
print(f" Does not have any children.")
diff --git a/keepercommander/commands/pam_import/README.md b/keepercommander/commands/pam_import/README.md
index b98936506..97db7271b 100644
--- a/keepercommander/commands/pam_import/README.md
+++ b/keepercommander/commands/pam_import/README.md
@@ -83,6 +83,8 @@ _You can have only one `pam_configuration` section and the only required paramet
"connections": "on",
"rotation": "on",
"tunneling": "on",
+ "ai_threat_detection": "off",
+ "ai_terminate_session_on_detection": "off",
"remote_browser_isolation": "on",
"graphical_session_recording": "off",
"text_session_recording": "off",
@@ -201,6 +203,109 @@ Each Machine (pamMachine, pamDatabase, pamDirectory) can specify admin user whic
> **Note 1:** `pam_settings` _(options, connection)_ are explained only in pamMachine section below (per protocol) but they are present in all machine types.
> **Note 2:** `attachments` and `scripts` examples are in `pam_configuration: local` section.
> **Note 3:** Post rotation scripts (a.k.a. `scripts`) are executed in following order: `pamUser` scripts after any **successful** rotation for that user, `pamMachine` scripts after any **successful** rotation on the machine and `pamConfiguration` scripts after any rotation using that configuration.
+
+JIT and KeeperAI settings below are shared across all resource types (pamMachine, pamDatabase, pamDirectory) except User and RBI (pamRemoteBrowser) records.
+
+
+Just-In-Time Access (JIT)
+
+[Just-In-Time Access (JIT)](https://docs.keeper.io/en/keeperpam/privileged-access-manager/getting-started/just-in-time-access-jit) - By implementing JIT access controls, organizations can significantly reduce their attack surface by ensuring that privileged access is only granted when needed, for the duration required, and with appropriate approvals.
+
+**How to Configure:** Import JSON follows Keeper Vault web UI (JIT tab on resource records). Configure the elevation settings (Ephemeral account or Group/Role elevation) using `pam_settings.options.jit_settings`. Use `pam_directory_record` to reference a pamDirectory by its `title` from `pam_data.resources[]` (for domain account type):
+
+```json
+{
+ "jit_settings": {
+ "create_ephemeral": true,
+ "elevate": true,
+ "_comment_method": "elevation methods: ",
+ "elevation_method": "group",
+ "elevation_string": "arn:aws:iam::12345:role/Admin",
+ "base_distinguished_name": "OU=Users,DC=example,DC=net",
+ "_comment_ephemeral_account_types": "",
+ "ephemeral_account_type": "linux",
+ "_comment_pam_directory_record": "by title, requried if ephemeral_account_type: domain",
+ "pam_directory_record": "PAM AD1"
+ }
+}
+```
+
+
+KeeperAI
+
+[KeeperAI](https://docs.keeper.io/en/keeperpam/privileged-access-manager/keeperai) - AI-powered threat detection for KeeperPAM privileged sessions. KeeperAI is an Agentic AI-powered threat detection system that automatically monitors and analyzes KeeperPAM privileged sessions to identify suspicious or malicious behavior.
+
+**PAM Configuration Settings** (in `pam_configuration`):
+- `ai_threat_detection`
+- `ai_terminate_session_on_detection`
+
+**Activating Threat Detection on a Resource:** Import JSON follows Keeper Vault web UI (AI tab on resource records). Session recordings (graphical and/or text) must be enabled for KeeperAI to work. Edit PAM Settings for your selected resource: enable `ai_threat_detection` and `ai_terminate_session_on_detection` in `pam_settings.options`, then add `pam_settings.options.ai_settings` with your risk-level rules:
+
+```json
+{
+ "pam_settings": {
+ "options": {
+ "graphical_session_recording": "on",
+ "text_session_recording": "on",
+ "ai_threat_detection": "on",
+ "ai_terminate_session_on_detection": "on",
+ "ai_settings": {
+ "risk_levels": {
+ "critical": {
+ "ai_session_terminate": true,
+ "activities": {
+ "allow": [
+ {"tag": "mount"},
+ {"tag": "umount"}
+ ],
+ "deny": [
+ {"tag": "iptables"},
+ {"tag": "wget | sh"}
+ ]
+ }
+ },
+ "high": {
+ "ai_session_terminate": true,
+ "activities": {
+ "allow": [
+ {"tag": "\\bmount\\b"},
+ {"tag": "\\bumount\\b"}
+ ],
+ "deny": [
+ {"tag": "kill -9"},
+ {"tag": "\\bkill\\s+-9\\b.*"}
+ ]
+ }
+ },
+ "medium": {
+ "ai_session_terminate": true,
+ "activities": {
+ "allow": [
+ {"tag": "chmod"},
+ {"tag": "chown"}
+ ],
+ "deny": [
+ {"tag": "bash"},
+ {"tag": "dash"}
+ ]
+ }
+ },
+ "low": {
+ "ai_session_terminate": false,
+ "activities": {
+ "allow": [
+ {"tag": "\\bwget\\b"},
+ {"tag": "\\bchmod\\b"}
+ ]
+ }
+ }
+ }
+ }
+ }
+ }
+}
+```
+
pam_data.resources.pamMachine (RDP)
@@ -211,6 +316,7 @@ Each Machine (pamMachine, pamDatabase, pamDirectory) can specify admin user whic
"notes": "RDP Machine1",
"host": "127.0.0.1",
"port": "3389",
+ "_comment_port": "administrative port",
"ssl_verification" : true,
"operating_system": "Windows",
"instance_name": "InstanceName",
@@ -227,16 +333,22 @@ Each Machine (pamMachine, pamDatabase, pamDirectory) can specify admin user whic
"tunneling": "on",
"remote_browser_isolation": "on",
"graphical_session_recording": "on",
+ "ai_threat_detection": "off",
+ "ai_terminate_session_on_detection": "off",
+ "jit_settings": {},
+ "ai_settings": {}
},
"allow_supply_host": false,
"port_forward": {
"_comment": "Tunneling settings",
+ "_comment_port": "remote tunneling port",
"port": "2222",
"reuse_port": true
},
"connection" : {
"_comment": "Connections settings per protocol - RDP",
"protocol": "rdp",
+ "_comment_port": "connection port",
"port": "2222",
"allow_supply_user": true,
"administrative_credentials": "admin1",
@@ -292,7 +404,9 @@ Each Machine (pamMachine, pamDatabase, pamDirectory) can specify admin user whic
"tunneling": "on",
"remote_browser_isolation": "on",
"graphical_session_recording": "on",
- "text_session_recording": "on"
+ "text_session_recording": "on",
+ "ai_threat_detection": "off",
+ "ai_terminate_session_on_detection": "off"
},
"allow_supply_host": false,
"port_forward": {
@@ -517,7 +631,9 @@ Each Machine (pamMachine, pamDatabase, pamDirectory) can specify admin user whic
"tunneling": "on",
"remote_browser_isolation": "on",
"graphical_session_recording": "on",
- "text_session_recording": "on"
+ "text_session_recording": "on",
+ "ai_threat_detection": "off",
+ "ai_terminate_session_on_detection": "off"
},
"allow_supply_host": false,
"port_forward": {
diff --git a/keepercommander/commands/pam_import/edit.py b/keepercommander/commands/pam_import/edit.py
index 9a3959dca..fb06bb5bc 100644
--- a/keepercommander/commands/pam_import/edit.py
+++ b/keepercommander/commands/pam_import/edit.py
@@ -21,6 +21,7 @@
from pathlib import Path
from typing import Any, Dict, Optional, List, Union
+from .keeper_ai_settings import set_resource_jit_settings, set_resource_keeper_ai_settings, refresh_meta_to_latest, refresh_link_to_config_to_latest
from ..base import Command, GroupCommand
from ..ksm import KSMCommand
from ..pam import gateway_helper
@@ -36,6 +37,7 @@
from ...error import CommandError
from ...importer import imp_exp
from ...importer.importer import SharedFolder, Permission
+from ...keeper_dag import EdgeType
from ...params import KeeperParams, LAST_FOLDER_UID, LAST_SHARED_FOLDER_UID
from ...proto import record_pb2, APIRequest_pb2, enterprise_pb2
from ...recordv3 import RecordV3
@@ -82,7 +84,12 @@ def __init__(self):
"pam_data": {
"resources": [
{ "type": "pamMachine", "title": "RDP Machine",
- "pam_settings": {"options" : {}, "connection" : {}},
+ "pam_settings": {
+ "options" : {
+ "jit_settings": {},
+ "ai_settings": {}
+ },
+ "connection" : {}},
"users": [
{"type": "pamUser", "login": "admin1", "password": "xyz"},
{"type": "pamUser", "login": "user1", "password": "abc"}
@@ -471,7 +478,9 @@ def process_pam_config(self, params, project: dict) -> dict:
"rotation": "on",
"remotebrowserisolation": "on",
"recording": "on",
- "typescriptrecording": "on"
+ "typescriptrecording": "on",
+ "ai_threat_detection": "off",
+ "ai_terminate_session_on_detection": "off"
})
else:
if pce.port_mapping: args["port_mapping"] = pce.port_mapping
@@ -484,7 +493,9 @@ def process_pam_config(self, params, project: dict) -> dict:
"rotation": pce.rotation,
"remotebrowserisolation": pce.remote_browser_isolation,
"recording": pce.graphical_session_recording,
- "typescriptrecording": pce.text_session_recording
+ "typescriptrecording": pce.text_session_recording,
+ "ai_threat_detection": pce.ai_threat_detection,
+ "ai_terminate_session_on_detection": pce.ai_terminate_session_on_detection
})
if pce.environment == "local":
@@ -1484,6 +1495,7 @@ def process_data(self, params, project):
# pam_settings.connection.administrative_credentials must reference
# one of its own users[] -> userRecords["admin_user_record_UID"]
machines = [x for x in resources if not isinstance(x, PamRemoteBrowserObject)]
+ pam_directories = [x for x in machines if (getattr(x, "type", "") or "").lower() == "pamdirectory"]
for mach in resources:
if not mach: continue
admin_cred = get_admin_credential(mach)
@@ -1523,6 +1535,20 @@ def process_data(self, params, project):
if ruid:
set_user_record_uid(mach, ruid, is_external)
+ # jit_settings.pam_directory_record -> pam_directory_uid (pamDirectory in pam_data.resources by title)
+ if (mach.pam_settings and mach.pam_settings.jit_settings and
+ getattr(mach.pam_settings.jit_settings, "pam_directory_record", None)):
+ jit = mach.pam_settings.jit_settings
+ ref = (jit.pam_directory_record or "").strip()
+ if ref:
+ matches = [x for x in pam_directories if getattr(x, "title", None) == ref]
+ if len(matches) > 1:
+ logging.warning(f"{bcolors.WARNING}Multiple pamDirectory matches for jit_settings.pam_directory_record '{ref}' in {getattr(mach, 'title', mach)}; using first.{bcolors.ENDC}")
+ if len(matches) == 0:
+ logging.error(f"jit_settings.pam_directory_record '{ref}' for '{getattr(mach, 'title', mach)}': no pamDirectory record found in pam_data.resources. Match by title.")
+ else:
+ jit.pam_directory_uid = matches[0].uid
+
# resolve machine PRS creds: additional_credentials[] -> recordRef[]
resolve_script_creds(mach, users, resources)
@@ -1656,6 +1682,34 @@ def process_data(self, params, project):
args = parse_command_options(mach, False)
tdag.set_resource_allowed(**args)
+ # After setting allowedSettings, save JIT settings if present
+ # JIT settings don't apply to RBI records (only machine/db/directory)
+ if mach.pam_settings and mach.pam_settings.jit_settings:
+ jit_dag_dict = mach.pam_settings.jit_settings.to_dag_dict()
+ if jit_dag_dict: # Only save if not empty
+ set_resource_jit_settings(params, mach.uid, jit_dag_dict, pam_cfg_uid)
+
+ # After setting allowedSettings, save AI settings if present
+ # AI settings don't apply to RBI records (only machine/db/directory)
+ if mach.pam_settings and mach.pam_settings.ai_settings:
+ user_id = ""
+ if getattr(params, "account_uid_bytes", None):
+ user_id = utils.base64_url_encode(params.account_uid_bytes)
+ elif getattr(params, "user", ""):
+ user_id = params.user
+ ai_dag_dict = mach.pam_settings.ai_settings.to_dag_dict(user_id=user_id)
+ if ai_dag_dict: # Only save if not empty
+ set_resource_keeper_ai_settings(params, mach.uid, ai_dag_dict, pam_cfg_uid)
+
+ # Web vault UI visualizer shows only latest and meta is most wanted path.
+ # Note: DAG may take a while to sync in web vault
+ # Dummy update to meta so it is latest among DATA (after jit/ai).
+ if mach.pam_settings and (mach.pam_settings.jit_settings or mach.pam_settings.ai_settings):
+ refresh_meta_to_latest(params, mach.uid, pam_cfg_uid)
+ # Bump LINK to config only when AI is present (AI adds the encryption KEY).
+ if mach.pam_settings and mach.pam_settings.ai_settings:
+ refresh_link_to_config_to_latest(params, mach.uid, pam_cfg_uid)
+
# Machine - create its users (if any)
users = getattr(mach, "users", [])
users = users if isinstance(users, list) else []
@@ -1685,6 +1739,24 @@ def process_data(self, params, project):
prc.execute(params, silent=True, **args)
if resources: print(f"{len(resources)}/{len(resources)}\n")
+ # link machine -> pamDirectory (LINK, path=domain) for jit_settings.pam_directory_uid
+ jit_domain_links_added = False
+ for mach in resources:
+ if not (mach and mach.pam_settings and mach.pam_settings.jit_settings):
+ continue
+ jit = mach.pam_settings.jit_settings
+ dir_uid = getattr(jit, "pam_directory_uid", None)
+ if not dir_uid:
+ continue
+ dag = tdag.linking_dag
+ machine_vertex = dag.get_vertex(mach.uid)
+ dir_vertex = dag.get_vertex(dir_uid)
+ if machine_vertex and dir_vertex:
+ machine_vertex.belongs_to(dir_vertex, EdgeType.LINK, path="domain", content={})
+ jit_domain_links_added = True
+ if jit_domain_links_added:
+ tdag.linking_dag.save()
+
# add scripts with resolved additional credentials - owner records must exist
if pce and pce.scripts and pce.scripts.scripts:
refs = [x for x in pce.scripts.scripts if x.record_refs]
@@ -1729,6 +1801,8 @@ def _initialize(self):
self.remote_browser_isolation: str = "on"
self.graphical_session_recording: str = "on"
self.text_session_recording: str = "on"
+ self.ai_threat_detection: str = "off"
+ self.ai_terminate_session_on_detection: str = "off"
self.port_mapping: List[str] = [] # ex. ["2222=ssh", "33306=mysql"] for discovery, rotation etc.
self.default_rotation_schedule: dict = {} # "type": "On-Demand|CRON"
@@ -1821,6 +1895,10 @@ def __init__(self, environment_type:str, settings:dict, controller_uid:str, fold
if isinstance(val, str) and val in choices: self.graphical_session_recording = val
val = settings.get("text_session_recording", None)
if isinstance(val, str) and val in choices: self.text_session_recording = val
+ val = settings.get("ai_threat_detection", None)
+ if isinstance(val, str) and val in choices: self.ai_threat_detection = val
+ val = settings.get("ai_terminate_session_on_detection", None)
+ if isinstance(val, str) and val in choices: self.ai_terminate_session_on_detection = val
val = settings.get("port_mapping", None) # multiline
if isinstance(val, str): val = [val]
@@ -2186,6 +2264,8 @@ def __init__(self):
self.remote_browser_isolation: Optional[DagOptionValue] = None
self.graphical_session_recording: Optional[DagOptionValue] = None
self.text_session_recording: Optional[DagOptionValue] = None
+ self.ai_threat_detection: Optional[DagOptionValue] = None
+ self.ai_terminate_session_on_detection: Optional[DagOptionValue] = None
# NB! PAM User has its own rotation_settings: {}, cannot enable con/tun on user anyways
# remote_browser_isolation uses rbi, pam_resource, graphical_session_recording
# rotation uses only pam_resource, rotation
@@ -2206,10 +2286,237 @@ def load(cls, data: Union[str, dict]):
obj.remote_browser_isolation = DagOptionValue.map(data.get("remote_browser_isolation", None) or "")
obj.graphical_session_recording = DagOptionValue.map(data.get("graphical_session_recording", None) or "")
obj.text_session_recording = DagOptionValue.map(data.get("text_session_recording", None) or "")
+ obj.ai_threat_detection = DagOptionValue.map(data.get("ai_threat_detection", None) or "")
+ obj.ai_terminate_session_on_detection = DagOptionValue.map(data.get("ai_terminate_session_on_detection", None) or "")
return obj
+class DagJitSettingsObject():
+ def __init__(self):
+ self.create_ephemeral: bool = False
+ self.elevate: bool = False
+ self.elevation_method: str = "group"
+ self.elevation_string: str = ""
+ self.base_distinguished_name: str = ""
+ self.ephemeral_account_type: Optional[str] = None # Omit if missing
+ self.pam_directory_record: Optional[str] = None # Title of pamDirectory from pam_data.resources[], resolved to UID
+ self.pam_directory_uid: Optional[str] = None # Resolved pamDirectory record UID (set in process_data)
+
+ @classmethod
+ def validate_enum_value(cls, value: str, allowed_values: List[str], field_name: str) -> Optional[str]:
+ """Validate value against predefined list. Returns validated value or None if invalid."""
+ if not value or value == "":
+ return None # Empty string not allowed for enum fields
+ value_lower = value.lower()
+ allowed_lower = [v.lower() for v in allowed_values]
+ if value_lower in allowed_lower:
+ # Return original case from allowed_values
+ idx = allowed_lower.index(value_lower)
+ return allowed_values[idx]
+ logging.warning(f"Invalid {field_name} value '{value}'. Allowed: {allowed_values}. Skipping.")
+ return None
+
+ @classmethod
+ def load(cls, data: Union[str, dict]) -> Optional['DagJitSettingsObject']:
+ """Load JIT settings from JSON. Returns None if data is missing/empty."""
+ obj = cls()
+ try:
+ data = json.loads(data) if isinstance(data, str) else data
+ except:
+ logging.error(f"JIT settings failed to load from: {str(data)[:80]}")
+ return None
+
+ if not isinstance(data, dict):
+ return None
+
+ # Check if object is empty (no valid fields)
+ has_valid_fields = False
+
+ # Parse boolean fields with defaults
+ create_ephemeral = utils.value_to_boolean(data.get("create_ephemeral", None))
+ if create_ephemeral is not None:
+ obj.create_ephemeral = create_ephemeral
+ has_valid_fields = True
+
+ elevate = utils.value_to_boolean(data.get("elevate", None))
+ if elevate is not None:
+ obj.elevate = elevate
+ has_valid_fields = True
+
+ # Parse elevation_method with validation (defaults to "group" if missing or invalid)
+ elevation_method = data.get("elevation_method", None)
+ if elevation_method is not None:
+ validated = cls.validate_enum_value(str(elevation_method), ["group", "role"], "elevation_method")
+ if validated:
+ obj.elevation_method = validated
+ has_valid_fields = True
+ # If validation fails, keep default "group" from __init__() - still include in DAG JSON
+ # If missing, keep default "group" from __init__() - still include in DAG JSON
+
+ # Parse string fields
+ elevation_string = data.get("elevation_string", None)
+ if elevation_string is not None and str(elevation_string).strip():
+ obj.elevation_string = str(elevation_string).strip()
+ has_valid_fields = True
+
+ base_distinguished_name = data.get("base_distinguished_name", None)
+ if base_distinguished_name is not None and str(base_distinguished_name).strip():
+ obj.base_distinguished_name = str(base_distinguished_name).strip()
+ has_valid_fields = True
+
+ # Parse ephemeral_account_type with validation (omit if missing)
+ ephemeral_account_type = data.get("ephemeral_account_type", None)
+ if ephemeral_account_type is not None:
+ validated = cls.validate_enum_value(
+ str(ephemeral_account_type),
+ ["linux", "mac", "windows", "domain"],
+ "ephemeral_account_type"
+ )
+ if validated:
+ obj.ephemeral_account_type = validated
+ has_valid_fields = True
+
+ # Parse pam_directory_record (title of pamDirectory from pam_data.resources[]; resolved to pam_directory_uid later)
+ pam_directory_record = data.get("pam_directory_record", None)
+ if pam_directory_record is not None and str(pam_directory_record).strip():
+ obj.pam_directory_record = str(pam_directory_record).strip()
+ has_valid_fields = True
+
+ # Silently ignore any other unknown fields (permissive parsing)
+
+ # Return None if no valid fields were found (empty object)
+ return obj if has_valid_fields else None
+
+ def to_dag_dict(self) -> Dict[str, Any]:
+ """Convert to DAG JSON format (camelCase)."""
+ result = {
+ "createEphemeral": self.create_ephemeral,
+ "elevate": self.elevate,
+ "elevationMethod": self.elevation_method, # Always included (defaults to "group" if missing/invalid)
+ "elevationString": self.elevation_string,
+ "baseDistinguishedName": self.base_distinguished_name
+ }
+ # Only include ephemeralAccountType if it was set (omit if missing/invalid)
+ if self.ephemeral_account_type:
+ result["ephemeralAccountType"] = self.ephemeral_account_type
+ return result
+
+
+class DagAiSettingsObject():
+ def __init__(self):
+ self.version: str = "v1.0.0"
+ self.risk_levels: Dict[str, Dict[str, Any]] = {}
+
+ @classmethod
+ def _parse_tag_list(cls, items: Any) -> List[str]:
+ tags: List[str] = []
+ if not isinstance(items, list):
+ return tags
+ for item in items:
+ tag = ""
+ if isinstance(item, str):
+ tag = item.strip()
+ elif isinstance(item, dict):
+ tag = str(item.get("tag", "")).strip()
+ if tag:
+ tags.append(tag)
+ return tags
+
+ @classmethod
+ def load(cls, data: Union[str, dict]) -> Optional['DagAiSettingsObject']:
+ """Load AI settings from JSON. Returns None if data is missing/empty."""
+ obj = cls()
+ try:
+ data = json.loads(data) if isinstance(data, str) else data
+ except:
+ logging.error(f"AI settings failed to load from: {str(data)[:80]}")
+ return None
+
+ if not isinstance(data, dict):
+ return None
+
+ risk_levels = data.get("risk_levels", None)
+ if not isinstance(risk_levels, dict):
+ return None
+
+ for level in ["critical", "high", "medium", "low"]:
+ level_data = risk_levels.get(level, None)
+ if not isinstance(level_data, dict):
+ continue
+
+ ai_session_terminate = utils.value_to_boolean(level_data.get("ai_session_terminate", None))
+ activities = level_data.get("activities", None) or {}
+ if not isinstance(activities, dict):
+ activities = {}
+
+ allow_tags = cls._parse_tag_list(activities.get("allow", []))
+ deny_tags = cls._parse_tag_list(activities.get("deny", []))
+
+ if ai_session_terminate is None and not allow_tags and not deny_tags:
+ continue
+
+ obj.risk_levels[level] = {
+ "ai_session_terminate": ai_session_terminate,
+ "allow": allow_tags,
+ "deny": deny_tags
+ }
+
+ return obj if obj.risk_levels else None
+
+ def _build_tag_entries(self, tags: List[str], action: str, user_id: str) -> List[Dict[str, Any]]:
+ entries: List[Dict[str, Any]] = []
+ for tag in tags:
+ if not tag:
+ continue
+ entries.append({
+ "tag": tag,
+ "auditLog": [{
+ "date": utils.current_milli_time(),
+ "userId": user_id,
+ "action": action
+ }]
+ })
+ return entries
+
+ def to_dag_dict(self, user_id: str) -> Optional[Dict[str, Any]]:
+ if not self.risk_levels:
+ return None
+
+ if not user_id:
+ logging.warning("AI settings auditLog userId is missing; auditLog will have empty userId.")
+ user_id = ""
+
+ risk_levels: Dict[str, Any] = {}
+ for level, data in self.risk_levels.items():
+ level_out: Dict[str, Any] = {}
+
+ if data.get("ai_session_terminate") is not None:
+ level_out["aiSessionTerminate"] = data["ai_session_terminate"]
+
+ tags_out: Dict[str, Any] = {}
+ allow_entries = self._build_tag_entries(data.get("allow", []), "added_to_allow", user_id)
+ if allow_entries:
+ tags_out["allow"] = allow_entries
+ deny_entries = self._build_tag_entries(data.get("deny", []), "added_to_deny", user_id)
+ if deny_entries:
+ tags_out["deny"] = deny_entries
+
+ if tags_out:
+ level_out["tags"] = tags_out
+
+ if level_out:
+ risk_levels[level] = level_out
+
+ if not risk_levels:
+ return None
+
+ return {
+ "version": self.version,
+ "riskLevels": risk_levels
+ }
+
+
class PamUserObject():
def __init__(self):
self.uid = ""
@@ -4234,11 +4541,15 @@ def __init__(
connection: PamConnectionSettings = None, # Optional[PamConnectionSettings]
portForward: Optional[PamPortForwardSettings] = None,
options: Optional[DagSettingsObject] = None,
+ jit_settings: Optional[DagJitSettingsObject] = None,
+ ai_settings: Optional[DagAiSettingsObject] = None,
):
self.allowSupplyHost = allowSupplyHost
self.connection = connection
self.portForward = portForward
self.options = options
+ self.jit_settings = jit_settings
+ self.ai_settings = ai_settings
# PamConnectionSettings excludes ConnectionSettingsHTTP
pam_connection_classes = [
@@ -4283,6 +4594,24 @@ def load(cls, data: Union[str, dict]):
if not is_empty_instance(options):
obj.options = options
+ # Parse jit_settings from options dict (nested inside options)
+ options_dict = data.get("options", {})
+ if isinstance(options_dict, dict):
+ jit_value = options_dict.get("jit_settings", None)
+ if jit_value is not None:
+ jit_settings = DagJitSettingsObject.load(jit_value)
+ if jit_settings:
+ obj.jit_settings = jit_settings
+
+ # Parse ai_settings from options dict (nested inside options)
+ options_dict = data.get("options", {})
+ if isinstance(options_dict, dict):
+ ai_value = options_dict.get("ai_settings", None)
+ if ai_value is not None:
+ ai_settings = DagAiSettingsObject.load(ai_value)
+ if ai_settings:
+ obj.ai_settings = ai_settings
+
portForward = PamPortForwardSettings.load(data.get("port_forward", {}))
if not is_empty_instance(portForward):
obj.portForward = portForward
@@ -4465,6 +4794,14 @@ def parse_command_options(obj, enable:bool) -> dict:
val = opts.remote_browser_isolation.value if opts.remote_browser_isolation else ""
key = "enable_remote_browser_isolation" if val == "on" else "disable_remote_browser_isolation" if val == "off" else None
if key is not None: args[key] = True
+ # AI and JIT settings don't apply to RBI records
+ if not isinstance(obj, PamRemoteBrowserObject):
+ val = opts.ai_threat_detection.value if opts.ai_threat_detection else ""
+ key = "enable_ai_threat_detection" if val == "on" else "disable_ai_threat_detection" if val == "off" else None
+ if key is not None: args[key] = True
+ val = opts.ai_terminate_session_on_detection.value if opts.ai_terminate_session_on_detection else ""
+ key = "enable_ai_terminate_session_on_detection" if val == "on" else "disable_ai_terminate_session_on_detection" if val == "off" else None
+ if key is not None: args[key] = True
else: # TunnelDAG.set_resource_allowed format rotation=True/False
if opts.rotation and opts.rotation.value in ("on", "off"):
args["rotation"] = choices[opts.rotation.value]
@@ -4480,6 +4817,12 @@ def parse_command_options(obj, enable:bool) -> dict:
args["session_recording"] = choices[opts.graphical_session_recording.value]
if opts.remote_browser_isolation and opts.remote_browser_isolation.value in ("on", "off"):
args["remote_browser_isolation"] = choices[opts.remote_browser_isolation.value]
+ # AI and JIT settings don't apply to RBI records
+ if not isinstance(obj, PamRemoteBrowserObject):
+ if opts.ai_threat_detection and opts.ai_threat_detection.value in ("on", "off"):
+ args["ai_enabled"] = choices[opts.ai_threat_detection.value]
+ if opts.ai_terminate_session_on_detection and opts.ai_terminate_session_on_detection.value in ("on", "off"):
+ args["ai_session_terminate"] = choices[opts.ai_terminate_session_on_detection.value]
return args
diff --git a/keepercommander/commands/pam_import/keeper_ai_settings.py b/keepercommander/commands/pam_import/keeper_ai_settings.py
new file mode 100644
index 000000000..9fbd30b9a
--- /dev/null
+++ b/keepercommander/commands/pam_import/keeper_ai_settings.py
@@ -0,0 +1,846 @@
+"""
+Manage Keeper AI and JIT settings from DAG DATA edge for a PAM resource.
+
+This module provides functionality to retrieve and parse Keeper AI
+and JIT risk level settings (Critical/High/Medium/Low with Monitor/Terminate actions)
+from the DAG DATA edges with path 'ai_settings' and 'jit_settings' on a resource vertex.
+"""
+
+import base64
+import json
+import logging
+from typing import Optional, Dict, Any, List
+from cryptography.hazmat.primitives.ciphers.aead import AESGCM
+
+from ...params import KeeperParams
+from ...keeper_dag import DAG, EdgeType
+from ...keeper_dag.connection.commander import Connection
+from ...keeper_dag.types import PamEndpoints
+from ...vault import PasswordRecord
+from ... import vault
+from ...display import bcolors
+from ..tunnel.port_forward.tunnel_helpers import get_config_uid, generate_random_bytes, get_keeper_tokens
+
+
+def list_resource_data_edges(
+ params: KeeperParams,
+ resource_uid: str,
+ config_uid: Optional[str] = None
+) -> List[Dict[str, Any]]:
+ """
+ List all DATA edges on a resource vertex to inspect available paths.
+
+ This is useful for discovering what settings are stored in the DAG,
+ such as 'ai_settings', 'jit_settings', etc.
+
+ Args:
+ params: KeeperParams instance
+ resource_uid: UID of the PAM resource
+ config_uid: Optional PAM config UID. If not provided, will be looked up.
+
+ Returns:
+ List of dictionaries containing edge information:
+ [{"path": "ai_settings", "version": 0, "active": True, "is_encrypted": True}, ...]
+ """
+ try:
+ # Get the record to access the record key
+ record = vault.KeeperRecord.load(params, resource_uid)
+ if not record:
+ logging.warning(f"Record {resource_uid} not found")
+ return []
+
+ # Get record key for decryption
+ record_key = None
+ if resource_uid in params.record_cache:
+ record_data = params.record_cache[resource_uid]
+ record_key = record_data.get('record_key_unencrypted')
+
+ if not record_key:
+ logging.warning(f"Record key not available for {resource_uid}")
+ return []
+
+ # Get config UID if not provided
+ if not config_uid:
+ encrypted_session_token, encrypted_transmission_key, transmission_key = get_keeper_tokens(params)
+ config_uid = get_config_uid(params, encrypted_session_token, encrypted_transmission_key, resource_uid)
+ if not config_uid:
+ config_uid = resource_uid
+
+ # Create DAG connection
+ encrypted_session_token, encrypted_transmission_key, transmission_key = get_keeper_tokens(params)
+
+ # Create a dummy record for DAG
+ dag_record = PasswordRecord()
+ dag_record.record_uid = config_uid
+ dag_record.record_key = record_key # Use record key to decrypt keychain
+
+ conn = Connection(
+ params=params,
+ encrypted_transmission_key=encrypted_transmission_key,
+ encrypted_session_token=encrypted_session_token,
+ transmission_key=transmission_key,
+ use_write_protobuf=True
+ )
+
+ # Load the DAG
+ linking_dag = DAG(
+ conn=conn,
+ record=dag_record,
+ graph_id=0,
+ write_endpoint=PamEndpoints.PAM
+ )
+ linking_dag.load()
+
+ # Get the resource vertex
+ resource_vertex = linking_dag.get_vertex(resource_uid)
+ if not resource_vertex:
+ logging.warning(f"Resource vertex {resource_uid} not found in DAG")
+ return []
+
+ # Collect all DATA edges
+ data_edges = []
+ for edge in resource_vertex.edges:
+ if edge and edge.edge_type == EdgeType.DATA:
+ data_edges.append({
+ "path": edge.path,
+ "version": edge.version,
+ "active": edge.active,
+ "is_encrypted": edge.is_encrypted,
+ "has_content": edge.content is not None
+ })
+
+ return data_edges
+
+ except Exception as e:
+ logging.error(f"Error listing DATA edges for {resource_uid}: {e}", exc_info=True)
+ return []
+
+
+def get_resource_settings(
+ params: KeeperParams,
+ resource_uid: str,
+ dag_path: str,
+ config_uid: Optional[str] = None
+) -> Optional[Dict[str, Any]]:
+ """
+ Generic function to retrieve settings from a DAG DATA edge with the specified path for a resource.
+
+ The settings are stored as encrypted JSON in a DATA edge on the resource vertex
+ with the specified path. This function:
+ 1. Loads the DAG for the resource
+ 2. Finds the DATA edge with the specified path
+ 3. Decrypts the content using the record key
+ 4. Parses the JSON to return the settings structure
+
+ Args:
+ params: KeeperParams instance
+ resource_uid: UID of the PAM resource (pamMachine, pamDatabase, etc.)
+ dag_path: Path of the DATA edge (e.g., 'ai_settings', 'jit_settings')
+ config_uid: Optional PAM config UID. If not provided, will be looked up.
+
+ Returns:
+ Dictionary containing settings if found, None otherwise.
+ """
+ try:
+ # Get the record to access the record key
+ record = vault.KeeperRecord.load(params, resource_uid)
+ if not record:
+ logging.warning(f"Record {resource_uid} not found")
+ return None
+
+ # Get record key for decryption
+ record_key = None
+ if resource_uid in params.record_cache:
+ record_data = params.record_cache[resource_uid]
+ record_key = record_data.get('record_key_unencrypted')
+
+ if not record_key:
+ logging.warning(f"Record key not available for {resource_uid}")
+ return None
+
+ # Get config UID if not provided
+ if not config_uid:
+ encrypted_session_token, encrypted_transmission_key, transmission_key = get_keeper_tokens(params)
+ config_uid = get_config_uid(params, encrypted_session_token, encrypted_transmission_key, resource_uid)
+ if not config_uid:
+ config_uid = resource_uid
+
+ # Create DAG connection
+ encrypted_session_token, encrypted_transmission_key, transmission_key = get_keeper_tokens(params)
+
+ # Create a dummy record for DAG (uses config UID as record UID)
+ dag_record = PasswordRecord()
+ dag_record.record_uid = config_uid
+ dag_record.record_key = record_key # Use record key to decrypt keychain
+
+ conn = Connection(
+ params=params,
+ encrypted_transmission_key=encrypted_transmission_key,
+ encrypted_session_token=encrypted_session_token,
+ transmission_key=transmission_key,
+ use_write_protobuf=True
+ )
+
+ # Load the DAG
+ linking_dag = DAG(
+ conn=conn,
+ record=dag_record,
+ graph_id=0,
+ write_endpoint=PamEndpoints.PAM
+ )
+ linking_dag.load()
+
+ # Get the resource vertex
+ resource_vertex = linking_dag.get_vertex(resource_uid)
+ if not resource_vertex:
+ logging.warning(f"Resource vertex {resource_uid} not found in DAG")
+ return None
+
+ # Find the DATA edge with the specified path (get highest version, regardless of active status)
+ settings_edge = None
+ highest_version = -1
+ for edge in resource_vertex.edges:
+ if edge and (edge.edge_type == EdgeType.DATA and
+ edge.path == dag_path):
+ if edge.version > highest_version:
+ highest_version = edge.version
+ settings_edge = edge
+
+ if not settings_edge:
+ logging.debug(f"No '{dag_path}' DATA edge found for resource {resource_uid}")
+ return None
+
+ # Get the content from the edge
+ edge_content = settings_edge.content
+ if not edge_content:
+ logging.debug(f"'{dag_path}' edge has no content for resource {resource_uid}")
+ return None
+
+ # Check if content appears to be encrypted (binary data that's not valid UTF-8)
+ # Even if is_encrypted is False, inactive edges might not have been decrypted
+ is_encrypted_content = settings_edge.is_encrypted
+ if not is_encrypted_content and isinstance(edge_content, bytes):
+ # Try to detect if it's encrypted by attempting to decode
+ # Encrypted content will fail UTF-8 decoding
+ try:
+ test_decode = edge_content.decode('utf-8')
+ # If decode succeeds, check if it looks like JSON (starts with { or [)
+ if not (test_decode.strip().startswith('{') or test_decode.strip().startswith('[')):
+ # Doesn't look like JSON, might be encrypted
+ is_encrypted_content = True
+ except (UnicodeDecodeError, AttributeError):
+ # Decode failed - likely encrypted binary data
+ is_encrypted_content = True
+
+ # Check if edge is still encrypted
+ if is_encrypted_content:
+ # Content is encrypted - need to decrypt manually using record key
+ if not isinstance(edge_content, (bytes, str)):
+ logging.warning(f"Unexpected encrypted content type: {type(edge_content)}")
+ return None
+
+ # Convert to bytes if it's a string (base64 encoded)
+ if isinstance(edge_content, str):
+ try:
+ edge_content = base64.urlsafe_b64decode(edge_content + '==')
+ except Exception as e:
+ logging.warning(f"Failed to decode base64 content: {e}")
+ return None
+
+ # Decrypt using AES-GCM
+ try:
+ if len(edge_content) < 12:
+ logging.warning(f"Encrypted content too short: {len(edge_content)} bytes")
+ return None
+
+ aesgcm = AESGCM(record_key)
+ nonce = edge_content[:12]
+ ciphertext = edge_content[12:]
+ decrypted_bytes = aesgcm.decrypt(nonce, ciphertext, None)
+ except Exception as e:
+ logging.warning(f"Failed to decrypt {dag_path} content: {e}")
+ return None
+
+ # Parse JSON
+ try:
+ settings = json.loads(decrypted_bytes.decode('utf-8'))
+ return settings
+ except Exception as e:
+ logging.warning(f"Failed to parse {dag_path} JSON: {e}")
+ return None
+ else:
+ # Content is already decrypted by DAG
+ if isinstance(edge_content, dict):
+ return edge_content
+
+ if isinstance(edge_content, str):
+ try:
+ return json.loads(edge_content)
+ except Exception as e:
+ logging.warning(f"Failed to parse already-decrypted content: {e}")
+ return None
+
+ if isinstance(edge_content, bytes):
+ # Try to decode as UTF-8 JSON
+ try:
+ decoded_str = edge_content.decode('utf-8')
+ return json.loads(decoded_str)
+ except UnicodeDecodeError:
+ # If UTF-8 decode fails, it might still be encrypted
+ # Try decrypting it
+ try:
+ if len(edge_content) >= 12:
+ aesgcm = AESGCM(record_key)
+ nonce = edge_content[:12]
+ ciphertext = edge_content[12:]
+ decrypted_bytes = aesgcm.decrypt(nonce, ciphertext, None)
+ return json.loads(decrypted_bytes.decode('utf-8'))
+ except Exception as decrypt_error:
+ logging.warning(f"Content appears encrypted but decryption failed: {decrypt_error}")
+ return None
+ except Exception as e:
+ logging.warning(f"Failed to decode bytes content: {e}")
+ return None
+
+ logging.warning(f"Unexpected decrypted content type: {type(edge_content)}")
+ return None
+
+ except Exception as e:
+ logging.error(f"Error retrieving {dag_path} settings for {resource_uid}: {e}", exc_info=True)
+ return None
+
+
+def get_resource_jit_settings(
+ params: KeeperParams,
+ resource_uid: str,
+ config_uid: Optional[str] = None
+) -> Optional[Dict[str, Any]]:
+ """
+ Retrieve JIT settings from the DAG DATA edge with path 'jit_settings' for a resource.
+
+ This function checks if JIT settings are stored in a DAG DATA edge similar to AI settings.
+ The settings are expected to be stored as encrypted JSON in a DATA edge on the resource vertex
+ with path 'jit_settings'.
+
+ Args:
+ params: KeeperParams instance
+ resource_uid: UID of the PAM resource (pamMachine, pamDatabase, etc.)
+ config_uid: Optional PAM config UID. If not provided, will be looked up.
+
+ Returns:
+ Dictionary containing JIT settings if found, None otherwise.
+ """
+ return get_resource_settings(params, resource_uid, 'jit_settings', config_uid)
+
+
+def get_resource_keeper_ai_settings(
+ params: KeeperParams,
+ resource_uid: str,
+ config_uid: Optional[str] = None
+) -> Optional[Dict[str, Any]]:
+ """
+ Retrieve KeeperAI settings from the DAG DATA edge with path 'ai_settings' for a resource.
+
+ The settings are stored as encrypted JSON in a DATA edge on the resource vertex
+ with path 'ai_settings'. This function:
+ 1. Loads the DAG for the resource
+ 2. Finds the DATA edge with path 'ai_settings'
+ 3. Decrypts the content using the record key
+ 4. Parses the JSON to return the KeeperAISettings structure
+
+ Args:
+ params: KeeperParams instance
+ resource_uid: UID of the PAM resource (pamMachine, pamDatabase, etc.)
+ config_uid: Optional PAM config UID. If not provided, will be looked up.
+
+ Returns:
+ Dictionary containing KeeperAI settings with structure:
+ {
+ "version": "v1.0.0",
+ "riskLevels": {
+ "critical": {
+ "aiSessionTerminate": bool,
+ "tags": {
+ "allow": [...],
+ "deny": [...]
+ }
+ },
+ "high": {...},
+ "medium": {...},
+ "low": {
+ "aiSessionTerminate": bool,
+ "tags": {
+ "allow": [...]
+ }
+ }
+ }
+ }
+ Returns None if settings not found or error occurred.
+ """
+ return get_resource_settings(params, resource_uid, 'ai_settings', config_uid)
+
+
+def set_resource_keeper_ai_settings(
+ params: KeeperParams,
+ resource_uid: str,
+ settings: Dict[str, Any],
+ config_uid: Optional[str] = None
+) -> bool:
+ """
+ Save KeeperAI settings to the DAG DATA edge with path 'ai_settings' for a resource.
+
+ The settings are stored as encrypted JSON in a DATA edge on the resource vertex
+ with path 'ai_settings'. This function:
+ 1. Loads the DAG for the resource
+ 2. Finds and deactivates any existing 'ai_settings' DATA edge
+ 3. Adds a new DATA edge with the provided settings
+ 4. Encrypts the content using the record key
+ 5. Saves the DAG
+
+ Args:
+ params: KeeperParams instance
+ resource_uid: UID of the PAM resource (pamMachine, pamDatabase, etc.)
+ settings: Dictionary containing KeeperAI settings to save
+ config_uid: Optional PAM config UID. If not provided, will be looked up.
+
+ Returns:
+ True if settings were saved successfully, False otherwise.
+ """
+ try:
+ # Get the record to access the record key
+ record = vault.KeeperRecord.load(params, resource_uid)
+ if not record:
+ logging.warning(f"Record {resource_uid} not found")
+ return False
+
+ # Get record key for encryption
+ record_key = None
+ if resource_uid in params.record_cache:
+ record_data = params.record_cache[resource_uid]
+ record_key = record_data.get('record_key_unencrypted')
+
+ if not record_key:
+ logging.warning(f"Record key not available for {resource_uid}")
+ return False
+
+ # Validate settings structure
+ if not isinstance(settings, dict):
+ logging.warning("Settings must be a dictionary")
+ return False
+
+ # Get config UID if not provided
+ if not config_uid:
+ encrypted_session_token, encrypted_transmission_key, transmission_key = get_keeper_tokens(params)
+ config_uid = get_config_uid(params, encrypted_session_token, encrypted_transmission_key, resource_uid)
+ if not config_uid:
+ config_uid = resource_uid
+
+ # Create DAG connection
+ encrypted_session_token, encrypted_transmission_key, transmission_key = get_keeper_tokens(params)
+
+ # Create a dummy record for DAG (uses config UID as record UID)
+ dag_record = PasswordRecord()
+ dag_record.record_uid = config_uid
+ dag_record.record_key = record_key # Use actual record key for encryption
+
+ conn = Connection(
+ params=params,
+ encrypted_transmission_key=encrypted_transmission_key,
+ encrypted_session_token=encrypted_session_token,
+ transmission_key=transmission_key,
+ use_write_protobuf=True
+ )
+
+ # Load the DAG with decryption enabled
+ linking_dag = DAG(
+ conn=conn,
+ record=dag_record,
+ graph_id=0,
+ write_endpoint=PamEndpoints.PAM,
+ decrypt=True # Enable decryption so we can encrypt on save
+ )
+ linking_dag.load()
+
+ # Get the resource vertex
+ resource_vertex = linking_dag.get_vertex(resource_uid)
+ if not resource_vertex:
+ logging.warning(f"Resource vertex {resource_uid} not found in DAG")
+ return False
+
+ # Ensure the vertex keychain has the record key for encryption
+ # The DAG save will use vertex.key (first key in keychain) to encrypt DATA edges
+ if not resource_vertex.keychain or len(resource_vertex.keychain) == 0:
+ resource_vertex.keychain = [record_key]
+ else:
+ # Ensure record key is in keychain (prepend it so it's the first/primary key)
+ keychain = resource_vertex.keychain
+ if record_key not in keychain:
+ keychain.insert(0, record_key)
+ resource_vertex.keychain = keychain
+
+ # Ensure there is a KEY edge so DATA edges can be added/encrypted.
+ # Prefer existing parent vertices; fall back to root if none exist.
+ if not resource_vertex.has_key:
+ parent_vertices = resource_vertex.belongs_to_vertices()
+ if parent_vertices:
+ resource_vertex.belongs_to(parent_vertices[0], edge_type=EdgeType.KEY)
+ else:
+ resource_vertex.belongs_to_root(EdgeType.KEY)
+ logging.debug(f"Added KEY edge for resource {resource_uid} to enable DATA encryption")
+
+ # Find and deactivate existing 'ai_settings' edge for proper versioning
+ existing_edge = None
+ for edge in resource_vertex.edges:
+ if edge and (edge.edge_type == EdgeType.DATA and
+ edge.path == 'ai_settings' and
+ edge.active):
+ existing_edge = edge
+ break
+
+ if existing_edge:
+ # Deactivate the existing edge
+ existing_edge.active = False
+ logging.debug(f"Deactivated existing 'ai_settings' edge (version {existing_edge.version})")
+
+ # Add new DATA edge with the settings
+ # The DAG will automatically encrypt it on save using vertex.key (record key)
+ resource_vertex.add_data(
+ content=settings, # Will be serialized to JSON and encrypted on save
+ path='ai_settings',
+ needs_encryption=True,
+ modified=True
+ )
+
+ # Save the DAG
+ linking_dag.save()
+
+ logging.debug(f"Successfully saved KeeperAI settings for resource {resource_uid}")
+ return True
+
+ except Exception as e:
+ logging.error(f"Error saving KeeperAI settings for {resource_uid}: {e}", exc_info=True)
+ return False
+
+
+def set_resource_jit_settings(
+ params: KeeperParams,
+ resource_uid: str,
+ settings: Dict[str, Any],
+ config_uid: Optional[str] = None
+) -> bool:
+ """
+ Save JIT settings to the DAG DATA edge with path 'jit_settings' for a resource.
+
+ The settings are stored as encrypted JSON in a DATA edge on the resource vertex
+ with path 'jit_settings'. This function:
+ 1. Loads the DAG for the resource
+ 2. Finds and deactivates any existing 'jit_settings' DATA edge
+ 3. Adds a new DATA edge with the provided settings
+ 4. Encrypts the content using the record key
+ 5. Saves the DAG
+
+ Args:
+ params: KeeperParams instance
+ resource_uid: UID of the PAM resource (pamMachine, pamDatabase, pamDirectory)
+ settings: Dictionary containing JIT settings to save
+ config_uid: Optional PAM config UID. If not provided, will be looked up.
+
+ Returns:
+ True if settings were saved successfully, False otherwise.
+ """
+ try:
+ # Return False if settings dict is empty
+ if not settings or not isinstance(settings, dict):
+ logging.debug(f"JIT settings empty or invalid for {resource_uid}, skipping")
+ return False
+
+ # Get the record to access the record key
+ record = vault.KeeperRecord.load(params, resource_uid)
+ if not record:
+ logging.warning(f"Record {resource_uid} not found")
+ return False
+
+ # Get record key for encryption
+ record_key = None
+ if resource_uid in params.record_cache:
+ record_data = params.record_cache[resource_uid]
+ record_key = record_data.get('record_key_unencrypted')
+
+ if not record_key:
+ logging.warning(f"Record key not available for {resource_uid}")
+ return False
+
+ # Get config UID if not provided
+ if not config_uid:
+ encrypted_session_token, encrypted_transmission_key, transmission_key = get_keeper_tokens(params)
+ config_uid = get_config_uid(params, encrypted_session_token, encrypted_transmission_key, resource_uid)
+ if not config_uid:
+ config_uid = resource_uid
+
+ # Create DAG connection
+ encrypted_session_token, encrypted_transmission_key, transmission_key = get_keeper_tokens(params)
+
+ # Create a dummy record for DAG (uses config UID as record UID)
+ dag_record = PasswordRecord()
+ dag_record.record_uid = config_uid
+ dag_record.record_key = record_key # Use actual record key for encryption
+
+ conn = Connection(
+ params=params,
+ encrypted_transmission_key=encrypted_transmission_key,
+ encrypted_session_token=encrypted_session_token,
+ transmission_key=transmission_key,
+ use_write_protobuf=True
+ )
+
+ # Load the DAG with decryption enabled
+ linking_dag = DAG(
+ conn=conn,
+ record=dag_record,
+ graph_id=0,
+ write_endpoint=PamEndpoints.PAM,
+ decrypt=True # Enable decryption so we can encrypt on save
+ )
+ linking_dag.load()
+
+ # Get the resource vertex
+ resource_vertex = linking_dag.get_vertex(resource_uid)
+ if not resource_vertex:
+ logging.warning(f"Resource vertex {resource_uid} not found in DAG")
+ return False
+
+ # Ensure the vertex keychain has the record key for encryption
+ # The DAG save will use vertex.key (first key in keychain) to encrypt DATA edges
+ if not resource_vertex.keychain or len(resource_vertex.keychain) == 0:
+ resource_vertex.keychain = [record_key]
+ else:
+ # Ensure record key is in keychain (prepend it so it's the first/primary key)
+ keychain = resource_vertex.keychain
+ if record_key not in keychain:
+ keychain.insert(0, record_key)
+ resource_vertex.keychain = keychain
+
+ # Ensure there is a KEY edge so DATA edges can be added/encrypted.
+ # Prefer existing parent vertices; fall back to root if none exist.
+ if not resource_vertex.has_key:
+ parent_vertices = resource_vertex.belongs_to_vertices()
+ if parent_vertices:
+ resource_vertex.belongs_to(parent_vertices[0], edge_type=EdgeType.KEY)
+ else:
+ resource_vertex.belongs_to_root(EdgeType.KEY)
+ logging.debug(f"Added KEY edge for resource {resource_uid} to enable DATA encryption")
+
+ # Find and deactivate existing 'jit_settings' edge for proper versioning
+ existing_edge = None
+ for edge in resource_vertex.edges:
+ if edge and (edge.edge_type == EdgeType.DATA and
+ edge.path == 'jit_settings' and
+ edge.active):
+ existing_edge = edge
+ break
+
+ if existing_edge:
+ # Deactivate the existing edge
+ existing_edge.active = False
+ logging.debug(f"Deactivated existing 'jit_settings' edge (version {existing_edge.version})")
+
+ # Add new DATA edge with the settings
+ # The DAG will automatically encrypt it on save using vertex.key (record key)
+ resource_vertex.add_data(
+ content=settings, # Will be serialized to JSON and encrypted on save
+ path='jit_settings',
+ needs_encryption=True,
+ modified=True
+ )
+
+ # Save the DAG
+ linking_dag.save()
+
+ logging.debug(f"Successfully saved JIT settings for resource {resource_uid}")
+ return True
+
+ except Exception as e:
+ logging.error(f"Error saving JIT settings for {resource_uid}: {e}", exc_info=True)
+ return False
+
+
+def refresh_meta_to_latest(
+ params: KeeperParams,
+ resource_uid: str,
+ config_uid: Optional[str] = None
+) -> bool:
+ """
+ Re-add the meta DATA edge with the same content so meta becomes the latest
+ (highest version) and appears on top. Call after writing jit_settings and/or
+ ai_settings.
+ """
+ try:
+ record = vault.KeeperRecord.load(params, resource_uid)
+ if not record:
+ return False
+ record_key = None
+ if resource_uid in params.record_cache:
+ record_data = params.record_cache[resource_uid]
+ record_key = record_data.get('record_key_unencrypted')
+ if not record_key:
+ return False
+ if not config_uid:
+ encrypted_session_token, encrypted_transmission_key, transmission_key = get_keeper_tokens(params)
+ config_uid = get_config_uid(params, encrypted_session_token, encrypted_transmission_key, resource_uid)
+ if not config_uid:
+ config_uid = resource_uid
+ encrypted_session_token, encrypted_transmission_key, transmission_key = get_keeper_tokens(params)
+ dag_record = PasswordRecord()
+ dag_record.record_uid = config_uid
+ dag_record.record_key = record_key
+ conn = Connection(
+ params=params,
+ encrypted_transmission_key=encrypted_transmission_key,
+ encrypted_session_token=encrypted_session_token,
+ transmission_key=transmission_key,
+ use_write_protobuf=True
+ )
+ linking_dag = DAG(
+ conn=conn,
+ record=dag_record,
+ graph_id=0,
+ write_endpoint=PamEndpoints.PAM,
+ decrypt=True
+ )
+ linking_dag.load()
+ resource_vertex = linking_dag.get_vertex(resource_uid)
+ if not resource_vertex:
+ return False
+ meta_edges = [e for e in (resource_vertex.edges or [])
+ if e and getattr(e, 'edge_type', None) == EdgeType.DATA
+ and getattr(e, 'path', None) == 'meta']
+ if not meta_edges:
+ return False
+ best = max(meta_edges, key=lambda e: getattr(e, 'version', -1))
+ try:
+ content = best.content_as_dict
+ except Exception:
+ return False
+ if content is None:
+ return False
+ resource_vertex.add_data(content=content, path='meta', needs_encryption=False)
+ linking_dag.save()
+ return True
+ except Exception as e:
+ logging.debug(f"refresh_meta_to_latest for {resource_uid}: {e}")
+ return False
+
+
+def refresh_link_to_config_to_latest(
+ params: KeeperParams,
+ resource_uid: str,
+ config_uid: Optional[str] = None
+) -> bool:
+ """
+ Dummy update to the LINK edge (resource -> PAM config) so LINK is the latest
+ edge to config, not KEY. JIT/AI add a KEY edge for encryption; in a normal
+ record the visible link to PAM config is LINK with path empty and content {}.
+ Call after writing jit_settings and/or ai_settings.
+ """
+ try:
+ record = vault.KeeperRecord.load(params, resource_uid)
+ if not record:
+ return False
+ record_key = None
+ if resource_uid in params.record_cache:
+ record_data = params.record_cache[resource_uid]
+ record_key = record_data.get('record_key_unencrypted')
+ if not record_key:
+ return False
+ if not config_uid:
+ encrypted_session_token, encrypted_transmission_key, transmission_key = get_keeper_tokens(params)
+ config_uid = get_config_uid(params, encrypted_session_token, encrypted_transmission_key, resource_uid)
+ if not config_uid:
+ config_uid = resource_uid
+ encrypted_session_token, encrypted_transmission_key, transmission_key = get_keeper_tokens(params)
+ dag_record = PasswordRecord()
+ dag_record.record_uid = config_uid
+ dag_record.record_key = record_key
+ conn = Connection(
+ params=params,
+ encrypted_transmission_key=encrypted_transmission_key,
+ encrypted_session_token=encrypted_session_token,
+ transmission_key=transmission_key,
+ use_write_protobuf=True
+ )
+ linking_dag = DAG(
+ conn=conn,
+ record=dag_record,
+ graph_id=0,
+ write_endpoint=PamEndpoints.PAM,
+ decrypt=True
+ )
+ linking_dag.load()
+ resource_vertex = linking_dag.get_vertex(resource_uid)
+ config_vertex = linking_dag.get_vertex(config_uid)
+ if not resource_vertex or not config_vertex:
+ return False
+ # Re-add LINK (path empty, content {}) so it becomes latest, above KEY added by JIT/AI
+ resource_vertex.belongs_to(config_vertex, EdgeType.LINK, path=None, content={})
+ linking_dag.save()
+ return True
+ except Exception as e:
+ logging.debug(f"refresh_link_to_config_to_latest for {resource_uid}: {e}")
+ return False
+
+
+def print_keeper_ai_settings(params: KeeperParams, resource_uid: str, config_uid: Optional[str] = None):
+ """
+ Print KeeperAI settings in a human-readable format.
+
+ Args:
+ params: KeeperParams instance
+ resource_uid: UID of the PAM resource
+ config_uid: Optional PAM config UID
+ """
+ settings = get_resource_keeper_ai_settings(params, resource_uid, config_uid)
+
+ if not settings:
+ print(f"{bcolors.WARNING}No KeeperAI settings found for resource {resource_uid}{bcolors.ENDC}")
+ return
+
+ print(f"\n{bcolors.OKBLUE}KeeperAI Settings for Resource: {resource_uid}{bcolors.ENDC}")
+ print(f"Version: {settings.get('version', 'unknown')}")
+ print(f"\n{bcolors.OKGREEN}Risk Level Configurations:{bcolors.ENDC}")
+
+ risk_levels = settings.get('riskLevels', {})
+ risk_level_order = ['critical', 'high', 'medium', 'low']
+
+ for level in risk_level_order:
+ level_data = risk_levels.get(level)
+ if not level_data:
+ continue
+
+ terminate = level_data.get('aiSessionTerminate', False)
+ tags = level_data.get('tags', {})
+ allow_tags = tags.get('allow', [])
+ deny_tags = tags.get('deny', []) if level != 'low' else []
+
+ level_color = {
+ 'critical': bcolors.FAIL,
+ 'high': bcolors.WARNING,
+ 'medium': bcolors.OKBLUE,
+ 'low': bcolors.OKGREEN
+ }.get(level, bcolors.ENDC)
+
+ print(f"\n {level_color}{level.upper()}{bcolors.ENDC}:")
+ print(f" Terminate Session: {bcolors.OKGREEN if terminate else bcolors.WARNING}{terminate}{bcolors.ENDC}")
+
+ if allow_tags:
+ print(f" Allow Tags ({len(allow_tags)}):")
+ for tag_item in allow_tags:
+ tag_name = tag_item.get('tag', '') if isinstance(tag_item, dict) else str(tag_item)
+ print(f" - {tag_name}")
+
+ if deny_tags:
+ print(f" Deny Tags ({len(deny_tags)}):")
+ for tag_item in deny_tags:
+ tag_name = tag_item.get('tag', '') if isinstance(tag_item, dict) else str(tag_item)
+ print(f" - {tag_name}")
+
+ print()
diff --git a/keepercommander/commands/pam_launch/launch.py b/keepercommander/commands/pam_launch/launch.py
index 767b600df..2a91dc3cd 100644
--- a/keepercommander/commands/pam_launch/launch.py
+++ b/keepercommander/commands/pam_launch/launch.py
@@ -381,27 +381,23 @@ def execute(self, params: KeeperParams, **kwargs):
logging.debug(f"Found gateway: {gateway_info['gateway_name']} ({gateway_info['gateway_uid']})")
logging.debug(f"Configuration: {gateway_info['config_uid']}")
- # Check if Gateway is online before attempting WebRTC connection
+ # Optionally check if Gateway appears online; if not, log warning and try anyway.
try:
connected_gateways = router_get_connected_gateways(params)
- connected_gateway_uids = [x.controllerUid for x in connected_gateways.controllers]
- gateway_uid_bytes = url_safe_str_to_bytes(gateway_info['gateway_uid'])
-
- if gateway_uid_bytes not in connected_gateway_uids:
- raise CommandError(
- 'pam launch',
- f'Gateway "{gateway_info["gateway_name"]}" ({gateway_info["gateway_uid"]}) is currently offline. '
- f'Please start the Gateway before attempting to connect. '
- f'Use "pam gateway list" to check Gateway status.'
- )
-
- logging.debug(f"✓ Gateway is online and connected")
+ if connected_gateways and connected_gateways.controllers:
+ connected_gateway_uids = [x.controllerUid for x in connected_gateways.controllers]
+ gateway_uid_bytes = url_safe_str_to_bytes(gateway_info['gateway_uid'])
+ if gateway_uid_bytes not in connected_gateway_uids:
+ logging.warning(
+ 'Gateway "%s" (%s) seems offline - trying to connect anyway.',
+ gateway_info['gateway_name'], gateway_info['gateway_uid']
+ )
+ else:
+ logging.debug(f"✓ Gateway is online and connected")
+ else:
+ logging.warning('Gateway seems offline - trying to connect anyway.')
except Exception as e:
- # If router is down or there's an error checking status, still try to connect
- # (the connection attempt will fail later with a more specific error)
- if isinstance(e, CommandError):
- raise
- logging.warning(f"Could not verify Gateway online status: {e}. Continuing anyway...")
+ logging.debug('Could not verify gateway status: %s. Continuing...', e)
# Launch terminal connection
result = launch_terminal_connection(params, record_uid, gateway_info, **kwargs)
diff --git a/keepercommander/commands/record_edit.py b/keepercommander/commands/record_edit.py
index 9827e67d4..3cf81c329 100644
--- a/keepercommander/commands/record_edit.py
+++ b/keepercommander/commands/record_edit.py
@@ -188,6 +188,9 @@
$GEN oneTimeCode Generates TOTP URL
$GEN:[alg,][enc] keyPair Generates a key pair and $GEN:ec,enc
optional passcode alg: [rsa | ec | ed25519], enc
+$BASE64: any string field Decodes base64 value password=$BASE64:ZmpzemRmaGtkZg==
+ Useful for passwords with Decodes to: fjzskfhkdf
+ special characters
$JSON: any object Sets a field value as JSON
phone.Cell=$JSON:'{"number": "(555) 555-1234", "type": "Mobile"}'
@@ -281,13 +284,20 @@ def assign_legacy_fields(self, record, fields):
if parsed_field.type == 'login':
record.login = parsed_field.value
elif parsed_field.type == 'password':
+ action_params.clear()
if self.is_generate_value(parsed_field.value, action_params):
record.password = self.generate_password(action_params)
+ elif self.is_base64_value(parsed_field.value, action_params):
+ if action_params:
+ record.password = action_params[0]
+ else:
+ logging.warning('Base64 decoding failed for password field')
else:
record.password = parsed_field.value
elif parsed_field.type == 'url':
record.link = parsed_field.value
elif parsed_field.type == 'oneTimeCode':
+ action_params.clear()
if self.is_generate_value(parsed_field.value, action_params):
record.totp = self.generate_totp_url()
else:
@@ -329,6 +339,22 @@ def is_generate_value(value, parameters): # type: (str, List[str]) -> Optiona
parameters.extend((x.strip() for x in gen_parameters.split(',')))
return True
+ @staticmethod
+ def is_base64_value(value, parameters): # type: (str, List[str]) -> Optional[bool]
+ """Check if value is base64-encoded and decode it."""
+ if value.startswith("$BASE64:"):
+ encoded_value = value[8:] # Skip "$BASE64:"
+ if encoded_value and isinstance(parameters, list):
+ try:
+ decoded_bytes = base64.b64decode(encoded_value)
+ decoded_str = decoded_bytes.decode('utf-8')
+ parameters.append(decoded_str)
+ return True
+ except Exception as e:
+ logging.warning(f'Failed to decode base64 value: {e}')
+ return True
+ return False
+
@staticmethod
def generate_key_pair(key_type, passphrase): # type: (str, str) -> dict
if key_type == 'ec':
@@ -572,6 +598,11 @@ def assign_typed_fields(self, record, fields):
parsed_fields.append(ParsedFieldValue('', 'password', 'passphrase', passphrase))
else:
self.on_warning(f'Cannot generate a value for a \"{record_field.type}\" field.')
+ elif self.is_base64_value(parsed_field.value, action_params):
+ if len(action_params) > 0:
+ value = action_params[0]
+ else:
+ self.on_warning(f'Base64 decoding failed for field \"{record_field.type}\".')
elif self.is_json_value(parsed_field.value, action_params):
if len(action_params) > 0:
value = self.validate_json_value(record_field.type, action_params[0])
diff --git a/keepercommander/commands/tunnel/port_forward/TunnelGraph.py b/keepercommander/commands/tunnel/port_forward/TunnelGraph.py
index a1c98c0db..44242b782 100644
--- a/keepercommander/commands/tunnel/port_forward/TunnelGraph.py
+++ b/keepercommander/commands/tunnel/port_forward/TunnelGraph.py
@@ -102,7 +102,8 @@ def _convert_allowed_setting(value):
def edit_tunneling_config(self, connections=None, tunneling=None,
rotation=None, session_recording=None,
typescript_recording=None,
- remote_browser_isolation=None):
+ remote_browser_isolation=None,
+ ai_enabled=None, ai_session_terminate=None):
config_vertex = self.linking_dag.get_vertex(self.record.record_uid)
if config_vertex is None:
config_vertex = self.linking_dag.add_vertex(uid=self.record.record_uid, vertex_type=RefType.PAM_NETWORK)
@@ -182,6 +183,24 @@ def edit_tunneling_config(self, connections=None, tunneling=None,
else:
allowed_settings["remoteBrowserIsolation"] = remote_browser_isolation
+ if ai_enabled is not None:
+ ai_enabled = self._convert_allowed_setting(ai_enabled)
+ if ai_enabled != allowed_settings.get("aiEnabled", None):
+ dirty = True
+ if ai_enabled is None:
+ allowed_settings.pop("aiEnabled", None)
+ else:
+ allowed_settings["aiEnabled"] = ai_enabled
+
+ if ai_session_terminate is not None:
+ ai_session_terminate = self._convert_allowed_setting(ai_session_terminate)
+ if ai_session_terminate != allowed_settings.get("aiSessionTerminate", None):
+ dirty = True
+ if ai_session_terminate is None:
+ allowed_settings.pop("aiSessionTerminate", None)
+ else:
+ allowed_settings["aiSessionTerminate"] = ai_session_terminate
+
if dirty:
config_vertex.add_data(content=content, path='meta', needs_encryption=False)
self.linking_dag.save()
@@ -405,6 +424,7 @@ def get_resource_setting(self, resource_uid: str, settings_name: str, setting: s
def set_resource_allowed(self, resource_uid, tunneling=None, connections=None, rotation=None,
session_recording=None, typescript_recording=None, remote_browser_isolation=None,
+ ai_enabled=None, ai_session_terminate=None,
allowed_settings_name='allowedSettings', is_config=False,
v_type: RefType=str(RefType.PAM_MACHINE)):
v_type = RefType(v_type)
@@ -491,6 +511,24 @@ def set_resource_allowed(self, resource_uid, tunneling=None, connections=None, r
else:
settings["remoteBrowserIsolation"] = remote_browser_isolation
+ if ai_enabled is not None:
+ ai_enabled = self._convert_allowed_setting(ai_enabled)
+ if ai_enabled != settings.get("aiEnabled", None):
+ dirty = True
+ if ai_enabled is None:
+ settings.pop("aiEnabled", None)
+ else:
+ settings["aiEnabled"] = ai_enabled
+
+ if ai_session_terminate is not None:
+ ai_session_terminate = self._convert_allowed_setting(ai_session_terminate)
+ if ai_session_terminate != settings.get("aiSessionTerminate", None):
+ dirty = True
+ if ai_session_terminate is None:
+ settings.pop("aiSessionTerminate", None)
+ else:
+ settings["aiSessionTerminate"] = ai_session_terminate
+
if dirty:
resource_vertex.add_data(content=content, path='meta', needs_encryption=False)
self.linking_dag.save()
diff --git a/keepercommander/commands/tunnel/port_forward/tunnel_helpers.py b/keepercommander/commands/tunnel/port_forward/tunnel_helpers.py
index a7c752472..84930ff40 100644
--- a/keepercommander/commands/tunnel/port_forward/tunnel_helpers.py
+++ b/keepercommander/commands/tunnel/port_forward/tunnel_helpers.py
@@ -20,6 +20,7 @@
from ....proto import pam_pb2
from ....commands.base import FolderMixin
+from ....commands.pam.config_helper import configuration_controller_get
from ....commands.pam.pam_dto import GatewayAction, GatewayActionWebRTCSession
from ....commands.pam.router_helper import router_get_relay_access_creds, get_dag_leafs, \
get_router_ws_url, router_send_action_to_gateway
@@ -217,8 +218,6 @@ def __init__(self, tube_id, conversation_id, gateway_uid, symmetric_key,
# Optional attributes (set dynamically)
# Note: signal_handler is set after TunnelSignalHandler is created
self.signal_handler = None # type: ignore[assignment]
- # Note: gateway_ready_event is an optional threading.Event set if needed
- self.gateway_ready_event = None # type: ignore[assignment]
def update_activity(self):
"""Update last activity timestamp"""
@@ -395,6 +394,29 @@ def __init__(self, name, level=logging.NOTSET):
# Restore original logger class
logging.setLoggerClass(original_logger_class)
+ # Suppress noisy webrtc dependency logs even in debug mode (set to WARNING)
+ webrtc_crates = [
+ 'webrtc', 'webrtc_ice', 'webrtc_mdns', 'webrtc_dtls',
+ 'webrtc_sctp', 'turn', 'stun', 'webrtc_ice.agent.agent_internal',
+ 'webrtc_ice.agent.agent_gather', 'webrtc_ice.mdns',
+ 'webrtc_mdns.conn', 'webrtc.peer_connection', 'turn.client'
+ ]
+ for crate_name in webrtc_crates:
+ crate_logger = logging.getLogger(crate_name)
+ crate_logger.setLevel(logging.WARNING)
+ crate_logger.propagate = False
+
+ # Suppress specific noisy keeper_pam_webrtc_rs sub-modules even in debug mode
+ # These log debug info at ERROR level, so we need to disable them entirely
+ noisy_keeper_loggers = [
+ 'keeper_pam_webrtc_rs.channel.core',
+ 'keeper_pam_webrtc_rs.python.utils'
+ ]
+ for logger_name in noisy_keeper_loggers:
+ noisy_logger = logging.getLogger(logger_name)
+ noisy_logger.setLevel(logging.CRITICAL + 1) # Disable completely
+ noisy_logger.propagate = False
+
logging.debug(f"Rust loggers enabled at DEBUG level")
enabled_loggers = [name for name in logging.Logger.manager.loggerDict.keys()
if isinstance(name, str) and name.startswith('keeper_pam_webrtc_rs')]
@@ -410,6 +432,29 @@ def __init__(self, name, level=logging.NOTSET):
logger = logging.getLogger(logger_name)
logger.setLevel(logging.ERROR)
+ # Completely suppress specific noisy keeper_pam_webrtc_rs sub-modules
+ # These log debug info at ERROR level, so we need to disable them entirely
+ suppress_completely = [
+ 'keeper_pam_webrtc_rs.channel.core',
+ 'keeper_pam_webrtc_rs.python.utils'
+ ]
+ for logger_name in suppress_completely:
+ suppress_logger = logging.getLogger(logger_name)
+ suppress_logger.setLevel(logging.CRITICAL + 1) # Disable completely
+ suppress_logger.propagate = False
+
+ # Suppress noisy webrtc dependency logs when not debugging
+ webrtc_crates = [
+ 'webrtc', 'webrtc_ice', 'webrtc_mdns', 'webrtc_dtls',
+ 'webrtc_sctp', 'turn', 'stun', 'webrtc_ice.agent.agent_internal',
+ 'webrtc_ice.agent.agent_gather', 'webrtc_ice.mdns',
+ 'webrtc_mdns.conn', 'webrtc.peer_connection', 'turn.client'
+ ]
+ for crate_name in webrtc_crates:
+ crate_logger = logging.getLogger(crate_name)
+ crate_logger.setLevel(logging.ERROR)
+ crate_logger.propagate = False
+
def get_or_create_tube_registry(params):
"""Get or create the tube registry instance, storing it on params for reuse"""
@@ -623,6 +668,12 @@ def get_config_uid_from_record(params, vault, record_uid):
def get_gateway_uid_from_record(params, vault, record_uid):
+ """Resolve gateway UID for a PAM resource record (pamMachine, etc.).
+
+ Lookup flow: record_uid -> PAM_LINK DAG -> config_uid -> gateway (controller)
+ Gateway is read from config record's pamResources.controllerUid; if missing,
+ falls back to pam/get_configuration_controller API
+ """
gateway_uid = ''
pam_config_uid = get_config_uid_from_record(params, vault, record_uid)
if pam_config_uid:
@@ -633,6 +684,16 @@ def get_gateway_uid_from_record(params, vault, record_uid):
if value:
gateway_uid = value.get('controllerUid', '') or ''
+ # Fallback: ask server for controller when config record has no local controllerUid
+ if not gateway_uid:
+ try:
+ config_uid_bytes = url_safe_str_to_bytes(pam_config_uid)
+ controller = configuration_controller_get(params, config_uid_bytes)
+ if controller and controller.controllerUid:
+ gateway_uid = utils.base64_url_encode(controller.controllerUid)
+ except Exception as e:
+ logging.debug('get_gateway_uid_from_record: get_configuration_controller fallback failed: %s', e)
+
return gateway_uid
@@ -803,7 +864,7 @@ async def connect_websocket_with_fallback(ws_endpoint, headers, ssl_context, tub
}
if WEBSOCKETS_VERSION == "asyncio":
- # websockets 15.0.1+ uses additional_headers and ssl_context/ssl parameters
+ # websockets 15.0.1+ uses additional_headers and ssl parameters
connect_kwargs = {
**base_kwargs,
"additional_headers": headers
@@ -1101,8 +1162,6 @@ def route_message_to_rust(response_item, tube_registry):
# Send any buffered local ICE candidates now that we have the answer
session = get_tunnel_session(tube_id)
if session:
- session.gateway_ready_event.set()
-
# Send any buffered local ICE candidates now that we have the answer
if session.buffered_ice_candidates:
logging.debug(f"Sending {len(session.buffered_ice_candidates)} buffered ICE candidates after answer")
@@ -1360,10 +1419,13 @@ def __init__(self, params, record_uid, gateway_uid, symmetric_key, base64_nonce,
self.symmetric_key = symmetric_key
self.base64_nonce = base64_nonce
self.conversation_id = conversation_id
+ self.conversation_type = conversation_type
self.tube_registry = tube_registry
self.tube_id = tube_id
self.trickle_ice = trickle_ice
self.connection_success_shown = False # Track if we've shown success messages
+ self.connection_connected = False # Track if WebRTC connection is established
+ self.ice_sending_in_progress = False # Serialize ICE candidate sending
self.host = None # Will be set later when the socket is ready
self.port = None
self.websocket_router = websocket_router # For key cleanup
@@ -1421,23 +1483,23 @@ def signal_from_rust(self, response: dict):
# Get tunnel session for record details
if session:
- logging.debug(f"\n{bcolors.OKGREEN}Connection established successfully.{bcolors.ENDC}")
+ logging.info(f"\n{bcolors.OKGREEN}Connection established successfully.{bcolors.ENDC}")
# Display record title if available
if session.record_title:
- logging.debug(f"{bcolors.OKBLUE}Record:{bcolors.ENDC} {session.record_title}")
+ logging.info(f"{bcolors.OKBLUE}Record:{bcolors.ENDC} {session.record_title}")
# Display remote target
if session.target_host and session.target_port:
- logging.debug(f"{bcolors.OKBLUE}Remote:{bcolors.ENDC} {session.target_host}:{session.target_port}")
+ logging.info(f"{bcolors.OKBLUE}Remote:{bcolors.ENDC} {session.target_host}:{session.target_port}")
# Display local listening address
if session.host and session.port:
- logging.debug(f"{bcolors.OKBLUE}Local:{bcolors.ENDC} {session.host}:{session.port}")
+ logging.info(f"{bcolors.OKBLUE}Local:{bcolors.ENDC} {session.host}:{session.port}")
# Display conversation ID
if session.conversation_id:
- logging.debug(f"{bcolors.OKBLUE}Conversation ID:{bcolors.ENDC} {session.conversation_id}")
+ logging.info(f"{bcolors.OKBLUE}Conversation ID:{bcolors.ENDC} {session.conversation_id}")
# Flush any buffered ICE candidates now that we're connected
if session and session.buffered_ice_candidates:
@@ -2088,7 +2150,7 @@ def start_rust_tunnel(params, record_uid, gateway_uid, host, port,
# DEDICATED WebSocket: No mutex needed! Each tunnel has its own connection.
# Backend registration is independent - no contention, no delays needed.
# Just a small delay to ensure backend is ready (much shorter than before)
- backend_registration_delay = float(os.getenv('WEBSOCKET_BACKEND_DELAY', '0.5'))
+ backend_registration_delay = float(os.getenv('WEBSOCKET_BACKEND_DELAY', '2.0'))
logging.debug(f"Dedicated WebSocket: Waiting {backend_registration_delay}s for backend to register conversation {conversation_id_original}...")
time.sleep(backend_registration_delay)
logging.debug("Backend conversation registration delay complete")
diff --git a/keepercommander/service/README.md b/keepercommander/service/README.md
index bfb70e26b..7f1fbdc15 100644
--- a/keepercommander/service/README.md
+++ b/keepercommander/service/README.md
@@ -95,6 +95,7 @@ Parameters:
- `-rl, --ratelimit`: Rate limit (e.g., "10/minute")
- `-ek, --encryption_key`: Encryption key for response encryption (automatically enables encryption)
- `-te, --token_expiration`: Token expiration time (e.g., "30m", "24h", "7d")
+- `-ur, --update-vault-record`: Config record UID to update with service metadata (Docker mode - stores API key and service URL)
### Service Management
diff --git a/keepercommander/service/commands/slack_app_setup.py b/keepercommander/service/commands/slack_app_setup.py
index 0b3f26d8a..d08cb2959 100644
--- a/keepercommander/service/commands/slack_app_setup.py
+++ b/keepercommander/service/commands/slack_app_setup.py
@@ -155,7 +155,7 @@ def _get_slack_service_configuration(self) -> ServiceConfig:
return ServiceConfig(
port=port,
- commands='search,share-record,share-folder,record-add,one-time-share,pedm,device-approve,get,server',
+ commands='search,share-record,share-folder,record-add,one-time-share,epm,pedm,device-approve,get,server',
queue_enabled=True, # Always enable queue mode (v2 API)
ngrok_enabled=ngrok_config['ngrok_enabled'],
ngrok_auth_token=ngrok_config['ngrok_auth_token'],
diff --git a/keepercommander/service/config/cli_handler.py b/keepercommander/service/config/cli_handler.py
index d0867ce82..9329c22a0 100644
--- a/keepercommander/service/config/cli_handler.py
+++ b/keepercommander/service/config/cli_handler.py
@@ -41,15 +41,27 @@ def execute_cli_command(self, params: KeeperParams, command: str) -> str:
@debug_decorator
def find_config_record(self, params: KeeperParams, title: str) -> Optional[str]:
- """Find existing config record and return its UID."""
+ """Find existing config record by exact title match using vault search."""
try:
- output = self.execute_cli_command(params, f"search -v '{title}'")
- if uid_match := re.search(r'UID: ([a-zA-Z0-9_-]+)', output):
- return uid_match.group(1)
+ from ... import vault_extensions
+
+ logger.debug(f"Searching for record with exact title: '{title}'")
+ records = list(vault_extensions.find_records(params, title))
+
+ # Filter to exact title match only
+ for record in records:
+ logger.debug(f"Checking record: '{record.title}' (UID: {record.record_uid})")
+ if record.title == title:
+ logger.debug(f"✓ Found exact title match: '{title}' (UID: {record.record_uid})")
+ return record.record_uid
+
+ logger.debug(f"✗ No record found with exact title: '{title}'")
+ return None
+
except Exception as e:
logger.error(f"Error searching for record: {e}")
print(f"Error searching for record: {e}")
- return None
+ return None
@debug_decorator
def get_help_output(self, params: KeeperParams) -> str:
@@ -62,7 +74,7 @@ def get_help_output(self, params: KeeperParams) -> str:
params.service_mode = True
from ... import cli
- cli.do_command(params, 'help')
+ cli.display_command_help(show_enterprise=True, show_shell=False, show_legacy=False)
return output.getvalue()
finally:
sys.stdout = sys.__stdout__
diff --git a/keepercommander/service/config/command_validator.py b/keepercommander/service/config/command_validator.py
index 24f5a2907..359d11d3f 100644
--- a/keepercommander/service/config/command_validator.py
+++ b/keepercommander/service/config/command_validator.py
@@ -92,40 +92,58 @@ def _process_new_command_line(self, line: str, valid_commands: Set,
command_info: Dict, category: str) -> None:
"""Process a command line from new categorized help output."""
# Extract command and alias from patterns like:
+ # "command (alias1, alias2) description"
# "command (alias) description"
# "command description"
- # Split on whitespace to separate command from description
- parts = line.split()
+ main_command = None
+ aliases_str = None
+
+ # Split line to get just the command part (before description)
+ # Aliases should appear immediately after the command, before multiple spaces
+ parts = line.split(None, 1) # Split on first whitespace
if not parts:
return
-
- main_command = parts[0]
- alias = None
- if len(parts) >= 3 and parts[2].startswith('(') and parts[2].endswith(')'):
- main_command = f"{parts[0]} {parts[1]}"
- alias = parts[2][1:-1].strip()
- elif len(parts) > 1 and parts[1].startswith('(') and parts[1].endswith(')'):
- # Extract alias: "(alias)" -> "alias"
- alias = parts[1][1:-1].strip()
- # Check if command and alias are combined: "command (alias)"
- elif '(' in main_command and ')' in main_command:
- # Extract main command and alias: "command (alias)"
- command_alias_part = main_command
- main_command = command_alias_part.split('(')[0].strip()
- alias_with_paren = command_alias_part.split('(')[1]
- alias = alias_with_paren.split(')')[0].strip()
+ command_part = parts[0]
+
+ # Check if the command part contains parentheses for aliases
+ # Pattern: "command" or "command(alias)" or "command (alias)"
+ if '(' in command_part and ')' in command_part:
+ # Extract command and aliases from the first token
+ paren_start = command_part.index('(')
+ paren_end = command_part.index(')', paren_start)
+
+ main_command = command_part[:paren_start].strip()
+ aliases_str = command_part[paren_start+1:paren_end].strip()
+ else:
+ # No parentheses in first token, check if second token is aliases
+ # Pattern: "command (alias1, alias2) description"
+ if len(parts) > 1:
+ rest = parts[1].lstrip()
+ if rest.startswith('(') and ')' in rest:
+ # Find the closing parenthesis
+ paren_end = rest.index(')')
+ aliases_str = rest[1:paren_end].strip()
+ main_command = command_part
+ else:
+ main_command = command_part
+ else:
+ main_command = command_part
# Add main command
if main_command:
valid_commands.add(main_command)
command_info[main_command] = {'category': category}
- # Add alias if found
- if alias:
- valid_commands.add(alias)
- command_info[alias] = {'category': category, 'main_command': main_command}
+ # Add alias(es) if found - handle multiple comma-separated aliases
+ if aliases_str:
+ # Split on commas to handle multiple aliases like "pedm, kepm"
+ aliases_list = [a.strip() for a in aliases_str.split(',')]
+ for single_alias in aliases_list:
+ if single_alias: # Skip empty strings
+ valid_commands.add(single_alias)
+ command_info[single_alias] = {'category': category, 'main_command': main_command}
def validate_command_list(self, commands: str, valid_commands: Set) -> str:
"""Validate input commands against valid commands."""
@@ -148,7 +166,16 @@ def generate_command_error_message(self, invalid_commands: List[str], command_in
"Available commands:"
]
- # Group commands by category, handling the new category names
+ # Build a map of main commands to their aliases
+ command_aliases = {}
+ for cmd, info in command_info.items():
+ if 'main_command' in info:
+ main_cmd = info['main_command']
+ if main_cmd not in command_aliases:
+ command_aliases[main_cmd] = []
+ command_aliases[main_cmd].append(cmd)
+
+ # Group commands by category
category_commands = {}
for cmd, info in command_info.items():
@@ -157,22 +184,21 @@ def generate_command_error_message(self, invalid_commands: List[str], command_in
category = info.get('category', 'Other')
if category not in category_commands:
category_commands[category] = []
- category_commands[category].append(cmd)
+
+ # Format command with aliases if they exist
+ aliases = command_aliases.get(cmd, [])
+ if aliases:
+ cmd_display = f"{cmd} ({', '.join(sorted(aliases))})"
+ else:
+ cmd_display = cmd
+ category_commands[category].append(cmd_display)
# Sort categories and display commands
for category in sorted(category_commands.keys()):
- commands = category_commands[category]
+ commands = sorted(category_commands[category])
if commands:
error_msg.append(f"\n{category}:")
- sorted_commands = sorted(commands)
- command_lines = []
- for i in range(0, len(sorted_commands), 12):
- command_lines.append(", ".join(sorted_commands[i:i+12]))
- try:
- _ = sorted_commands[i+13]
- command_lines[-1] += (", ")
- except IndexError:
- pass
- error_msg.extend(command_lines)
+ # Join all commands with comma-space, no artificial line breaks
+ error_msg.append(" " + ", ".join(commands))
return "\n".join(error_msg)
\ No newline at end of file
diff --git a/keepercommander/service/docker/setup_base.py b/keepercommander/service/docker/setup_base.py
index c87a544ba..9b0457b72 100644
--- a/keepercommander/service/docker/setup_base.py
+++ b/keepercommander/service/docker/setup_base.py
@@ -189,8 +189,10 @@ def _upload_config_file(self, params, record_uid: str, config_path: str) -> None
temp_config_path = cleaned_config_path
record = vault.KeeperRecord.load(params, record_uid)
- if not isinstance(record, (vault.PasswordRecord, vault.TypedRecord)):
+ if not isinstance(record, vault.TypedRecord):
raise CommandError('docker-setup', 'Invalid record type for attachments')
+ # Delete existing config.json attachments to prevent duplicates
+ self._delete_existing_config_attachments(record, params)
# Upload attachment
upload_task = attachment.FileUploadTask(cleaned_config_path)
@@ -213,6 +215,27 @@ def _upload_config_file(self, params, record_uid: str, config_path: str) -> None
print(f"Warning: Could not delete temporary config file: {e}")
pass
+ def _delete_existing_config_attachments(self, record, params) -> None:
+ """Delete any existing config.json attachments to prevent duplicates"""
+ # Modern records use TypedRecord with fileRef system
+ from ...record_facades import FileRefRecordFacade
+ facade = FileRefRecordFacade()
+ facade.record = record
+
+ file_uids_to_remove = []
+ for file_uid in facade.file_ref:
+ if file_uid in params.record_cache:
+ file_record = vault.KeeperRecord.load(params, file_uid)
+ if isinstance(file_record, vault.FileRecord):
+ if file_record.name.lower() == 'config.json' or file_record.title.lower() == 'config.json':
+ file_uids_to_remove.append(file_uid)
+
+ if file_uids_to_remove:
+ for file_uid in file_uids_to_remove:
+ facade.file_ref.remove(file_uid)
+ DockerSetupPrinter.print_success(f"Removed {len(file_uids_to_remove)} existing config.json attachment(s)")
+
+
def _clean_config_json(self, config_path: str) -> str:
"""Clean config.json by keeping only essential authentication keys"""
try:
diff --git a/keepercommander/vault.py b/keepercommander/vault.py
index 365e01530..bbe688af7 100644
--- a/keepercommander/vault.py
+++ b/keepercommander/vault.py
@@ -876,8 +876,8 @@ def load_record_data(self, data, extra=None):
self.type_name = sanitize_str_field_value(data.get('type')).strip()
self.title = sanitize_str_field_value(data.get('title')).strip()
self.notes = sanitize_str_field_value(data.get('notes'))
- self.fields.extend((TypedField(x) for x in data.get('fields', [])))
- self.custom.extend((TypedField(x) for x in data.get('custom', [])))
+ self.fields.extend((TypedField(x) for x in (data.get('fields') or [])))
+ self.custom.extend((TypedField(x) for x in (data.get('custom') or [])))
def enumerate_fields(self):
# type: () -> Iterable[Tuple[str, Union[None, str, List[str]]]]