diff --git a/.github/workflows/pythonpackage.yml b/.github/workflows/pythonpackage.yml index 185f60d..68d2a59 100644 --- a/.github/workflows/pythonpackage.yml +++ b/.github/workflows/pythonpackage.yml @@ -5,9 +5,9 @@ name: Python package on: push: - branches: [ master ] + branches: [ master, main, 'v*' ] pull_request: - branches: [ master ] + branches: [ master, main ] jobs: build: @@ -16,12 +16,12 @@ jobs: strategy: fail-fast: false matrix: - python-version: ["3.8", "3.9", "3.10", "3.11", "3.12"] + python-version: ["3.10", "3.11", "3.12", "3.13"] steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v4 - name: Set up Python ${{ matrix.python-version }} - uses: actions/setup-python@v2 + uses: actions/setup-python@v5 with: python-version: ${{ matrix.python-version }} - name: Install dependencies diff --git a/privx_api/api_proxy.py b/privx_api/api_proxy.py new file mode 100644 index 0000000..e7d4caf --- /dev/null +++ b/privx_api/api_proxy.py @@ -0,0 +1,387 @@ +from http import HTTPStatus +from typing import Optional + +from privx_api.base import BasePrivXAPI +from privx_api.enums import UrlEnum +from privx_api.response import PrivXAPIResponse +from privx_api.utils import get_value + + +class ApiProxyAPI(BasePrivXAPI): + def get_api_proxy_status(self) -> PrivXAPIResponse: + """ + Get API-Proxy microservice status. + + Returns: + PrivXAPIResponse + """ + response_status, data = self._http_get(UrlEnum.API_PROXY.STATUS) + return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + + def get_api_proxy_configuration(self) -> PrivXAPIResponse: + """ + Fetch API-Proxy configuration. + + Returns: + PrivXAPIResponse + """ + response_status, data = self._http_get(UrlEnum.API_PROXY.CONFIGURATION) + return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + + def create_api_target(self, api_target: dict) -> PrivXAPIResponse: + """ + Create a new API target. + + Returns: + PrivXAPIResponse + """ + response_status, data = self._http_post( + UrlEnum.API_PROXY.API_TARGETS, + body=api_target, + ) + return PrivXAPIResponse(response_status, HTTPStatus.CREATED, data) + + def get_api_targets( + self, + offset: Optional[int] = None, + limit: Optional[int] = None, + sort_key: Optional[str] = None, + sort_dir: Optional[str] = None, + filter_param: Optional[str] = None, + ) -> PrivXAPIResponse: + """ + List API targets. + + Returns: + PrivXAPIResponse + """ + query_params = self._get_search_params( + offset=offset, + limit=limit, + sortkey=sort_key, + sortdir=sort_dir, + filter=filter_param, + ) + response_status, data = self._http_get( + UrlEnum.API_PROXY.API_TARGETS, + query_params=query_params, + ) + return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + + def get_api_target(self, api_target_id: str) -> PrivXAPIResponse: + """ + Fetch a single API target by ID. + + Returns: + PrivXAPIResponse + """ + response_status, data = self._http_get( + UrlEnum.API_PROXY.API_TARGET, + path_params={"api_target_id": api_target_id}, + ) + return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + + def update_api_target( + self, + api_target_id: str, + api_target: dict, + ) -> PrivXAPIResponse: + """ + Update an API target by ID. + + Returns: + PrivXAPIResponse + """ + response_status, data = self._http_put( + UrlEnum.API_PROXY.API_TARGET, + path_params={"api_target_id": api_target_id}, + body=api_target, + ) + return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + + def delete_api_target(self, api_target_id: str) -> PrivXAPIResponse: + """ + Delete an API target by ID. + + Returns: + PrivXAPIResponse + """ + response_status, data = self._http_delete( + UrlEnum.API_PROXY.API_TARGET, + path_params={"api_target_id": api_target_id}, + ) + return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + + def search_api_targets( + self, + offset: Optional[int] = None, + limit: Optional[int] = None, + sort_key: Optional[str] = None, + sort_dir: Optional[str] = None, + filter_param: Optional[str] = None, + search_payload: Optional[dict] = None, + ) -> PrivXAPIResponse: + """ + Search for API targets with optional criteria. + + Returns: + PrivXAPIResponse + """ + query_params = self._get_search_params( + offset=offset, + limit=limit, + sortkey=sort_key, + sortdir=sort_dir, + filter=filter_param, + ) + response_status, data = self._http_post( + UrlEnum.API_PROXY.API_TARGET_SEARCH, + query_params=query_params, + body=get_value(search_payload, dict()), + ) + return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + + def get_api_target_tags( + self, + offset: Optional[int] = None, + limit: Optional[int] = None, + sort_dir: Optional[str] = None, + ) -> PrivXAPIResponse: + """ + List tags defined for API targets. + + Returns: + PrivXAPIResponse + """ + query_params = self._get_search_params( + offset=offset, + limit=limit, + sortdir=sort_dir, + ) + response_status, data = self._http_get( + UrlEnum.API_PROXY.API_TARGET_TAGS, + query_params=query_params, + ) + return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + + def create_current_user_client_credential( + self, + credential: dict, + ) -> PrivXAPIResponse: + """ + Create a client credential for the current user. + + Returns: + PrivXAPIResponse + """ + response_status, data = self._http_post( + UrlEnum.API_PROXY.CURRENT_CLIENT_CREDENTIALS, + body=credential, + ) + return PrivXAPIResponse(response_status, HTTPStatus.CREATED, data) + + def get_current_user_client_credentials( + self, + offset: Optional[int] = None, + limit: Optional[int] = None, + sort_key: Optional[str] = None, + sort_dir: Optional[str] = None, + ) -> PrivXAPIResponse: + """ + List client credentials owned by the current user. + + Returns: + PrivXAPIResponse + """ + query_params = self._get_search_params( + offset=offset, + limit=limit, + sortkey=sort_key, + sortdir=sort_dir, + ) + response_status, data = self._http_get( + UrlEnum.API_PROXY.CURRENT_CLIENT_CREDENTIALS, + query_params=query_params, + ) + return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + + def get_current_user_client_credential( + self, + credential_id: str, + ) -> PrivXAPIResponse: + """ + Fetch a current user's client credential by ID. + + Returns: + PrivXAPIResponse + """ + response_status, data = self._http_get( + UrlEnum.API_PROXY.CURRENT_CLIENT_CREDENTIAL, + path_params={"credential_id": credential_id}, + ) + return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + + def update_current_user_client_credential( + self, + credential_id: str, + credential: dict, + ) -> PrivXAPIResponse: + """ + Update a client credential owned by the current user. + + Returns: + PrivXAPIResponse + """ + response_status, data = self._http_put( + UrlEnum.API_PROXY.CURRENT_CLIENT_CREDENTIAL, + path_params={"credential_id": credential_id}, + body=credential, + ) + return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + + def delete_current_user_client_credential( + self, + credential_id: str, + ) -> PrivXAPIResponse: + """ + Delete a current user's client credential by ID. + + Returns: + PrivXAPIResponse + """ + response_status, data = self._http_delete( + UrlEnum.API_PROXY.CURRENT_CLIENT_CREDENTIAL, + path_params={"credential_id": credential_id}, + ) + return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + + def get_current_user_client_credential_secret( + self, + credential_id: str, + ) -> PrivXAPIResponse: + """ + Get a current user's client credential secret. + + Returns: + PrivXAPIResponse + """ + response_status, data = self._http_get( + UrlEnum.API_PROXY.CURRENT_CLIENT_CREDENTIAL_SECRET, + path_params={"credential_id": credential_id}, + ) + return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + + def create_user_client_credential( + self, + user_id: str, + credential: dict, + ) -> PrivXAPIResponse: + """ + Create a client credential for a specific user. + + Returns: + PrivXAPIResponse + """ + response_status, data = self._http_post( + UrlEnum.API_PROXY.USER_CLIENT_CREDENTIALS, + path_params={"user_id": user_id}, + body=credential, + ) + return PrivXAPIResponse(response_status, HTTPStatus.CREATED, data) + + def get_user_client_credentials( + self, + user_id: str, + offset: Optional[int] = None, + limit: Optional[int] = None, + sort_key: Optional[str] = None, + sort_dir: Optional[str] = None, + ) -> PrivXAPIResponse: + """ + List client credentials for a specific user. + + Returns: + PrivXAPIResponse + """ + query_params = self._get_search_params( + offset=offset, + limit=limit, + sortkey=sort_key, + sortdir=sort_dir, + ) + response_status, data = self._http_get( + UrlEnum.API_PROXY.USER_CLIENT_CREDENTIALS, + path_params={"user_id": user_id}, + query_params=query_params, + ) + return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + + def get_user_client_credential( + self, + user_id: str, + credential_id: str, + ) -> PrivXAPIResponse: + """ + Fetch a client credential for a specific user by ID. + + Returns: + PrivXAPIResponse + """ + response_status, data = self._http_get( + UrlEnum.API_PROXY.USER_CLIENT_CREDENTIAL, + path_params={"user_id": user_id, "credential_id": credential_id}, + ) + return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + + def update_user_client_credential( + self, + user_id: str, + credential_id: str, + credential: dict, + ) -> PrivXAPIResponse: + """ + Update a user-owned client credential by ID. + + Returns: + PrivXAPIResponse + """ + response_status, data = self._http_put( + UrlEnum.API_PROXY.USER_CLIENT_CREDENTIAL, + path_params={"user_id": user_id, "credential_id": credential_id}, + body=credential, + ) + return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + + def delete_user_client_credential( + self, + user_id: str, + credential_id: str, + ) -> PrivXAPIResponse: + """ + Delete a user-owned client credential by ID. + + Returns: + PrivXAPIResponse + """ + response_status, data = self._http_delete( + UrlEnum.API_PROXY.USER_CLIENT_CREDENTIAL, + path_params={"user_id": user_id, "credential_id": credential_id}, + ) + return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + + def get_user_client_credential_secret( + self, + user_id: str, + credential_id: str, + ) -> PrivXAPIResponse: + """ + Fetch a user-owned client credential secret by ID. + + Returns: + PrivXAPIResponse + """ + response_status, data = self._http_get( + UrlEnum.API_PROXY.USER_CLIENT_CREDENTIAL_SECRET, + path_params={"user_id": user_id, "credential_id": credential_id}, + ) + return PrivXAPIResponse(response_status, HTTPStatus.OK, data) diff --git a/privx_api/auth.py b/privx_api/auth.py index 9b76668..60c5176 100644 --- a/privx_api/auth.py +++ b/privx_api/auth.py @@ -117,7 +117,6 @@ def get_user_sessions( query_params=search_params, path_params={"user_id": user_id}, ) - print(response_status, data) return PrivXAPIResponse(response_status, HTTPStatus.OK, data) def get_source_sessions( diff --git a/privx_api/base.py b/privx_api/base.py index ed64c75..099c41a 100644 --- a/privx_api/base.py +++ b/privx_api/base.py @@ -9,6 +9,7 @@ from json import JSONDecodeError from typing import Optional, Tuple, Union +from privx_api.cookie_jar import RoutingCookieJar from privx_api.enums import NO_AUTH_STATUS_URLS, UrlEnum from privx_api.exceptions import InternalAPIException @@ -85,8 +86,7 @@ def __init__( self._re_auth_deadline = None self._access_token_age = None self._re_auth_margin = re_auth_margin - self._use_cookies = use_cookies - self._cookies = None + self._cookie_jar = RoutingCookieJar() if use_cookies else None def _authenticate(self, username: str, password: str) -> None: # saving the creds for the re-auth purposes @@ -107,16 +107,18 @@ def _authenticate(self, username: str, password: str) -> None: "Content-type": "application/x-www-form-urlencoded", "Authorization": "Basic {}".format(basic_auth.decode("utf-8")), } + token_url = self._get_url(UrlEnum.AUTH.TOKEN) try: conn.request( "POST", - self._get_url(UrlEnum.AUTH.TOKEN), + token_url, body=urllib.parse.urlencode(token_request), headers=headers, ) except (OSError, HTTPException) as e: raise InternalAPIException(e) response = conn.getresponse() + self._store_response_cookies(response, token_url) if response.status != 200: raise InternalAPIException("Invalid response: ", response.status) @@ -125,11 +127,6 @@ def _authenticate(self, username: str, password: str) -> None: except (JSONDecodeError, TypeError) as e: raise InternalAPIException(e) from e - # save and never change cookie`s value, - # in order to communicate with the same node - if self._cookies is None and self._use_cookies: - self._cookies = response.getheader("Set-Cookie") - # privx response includes access token age in seconds self._access_token_age = data.get("expires_in") self._re_auth_deadline = ( @@ -151,8 +148,8 @@ def _build_request( path_params = path_params or {} query_params = query_params or {} self._reauthenticate_access_token(url_name) - headers = self._get_headers() url = self._build_url(url_name, path_params, query_params) + headers = self._get_headers(url) request_dict = dict(method=method, url=url, headers=headers) if body is not None: request_dict["body"] = self._make_body_params(body) @@ -176,14 +173,18 @@ def _build_url( return url - def _get_headers(self) -> dict: + def _get_headers(self, url: str) -> dict: headers = { "Content-type": "application/json", } if self._access_token: headers["Authorization"] = "Bearer {}".format(self._access_token) - if self._cookies: - headers["Cookie"] = self._cookies + if self._cookie_jar: + cookie_header = self._cookie_jar.get_header( + self._connection_info["host"], url + ) + if cookie_header: + headers["Cookie"] = cookie_header return headers def _get_search_params(self, **kwargs: Union[str, int]) -> dict: @@ -204,24 +205,24 @@ def _http_get( ) -> Tuple: with Connection(self._connection_info) as conn: + request = self._build_request( + "GET", + url_name, + path_params, + query_params, + ) try: - conn.request( - **self._build_request( - "GET", - url_name, - path_params, - query_params, - ) - ) + conn.request(**request) except (OSError, HTTPException) as e: raise InternalAPIException(e) response = conn.getresponse() + self._store_response_cookies(response, request["url"]) return response.status, response.read() def _http_get_no_auth(self, url_name: str) -> Tuple: request = self._build_request("GET", url_name) headers = request["headers"] - del headers["Authorization"] + headers.pop("Authorization", None) with Connection(self._connection_info) as conn: try: @@ -229,6 +230,7 @@ def _http_get_no_auth(self, url_name: str) -> Tuple: except (OSError, HTTPException) as e: raise InternalAPIException(e) response = conn.getresponse() + self._store_response_cookies(response, request["url"]) return response.status, response.read() def _http_post( @@ -240,19 +242,19 @@ def _http_post( ) -> Tuple: with Connection(self._connection_info) as conn: + request = self._build_request( + "POST", + url_name, + path_params, + query_params, + body=body, + ) try: - conn.request( - **self._build_request( - "POST", - url_name, - path_params, - query_params, - body=body, - ) - ) + conn.request(**request) except (OSError, HTTPException) as e: raise InternalAPIException(e) response = conn.getresponse() + self._store_response_cookies(response, request["url"]) return response.status, response.read() def _http_put( @@ -264,19 +266,19 @@ def _http_put( ) -> Tuple: with Connection(self._connection_info) as conn: + request = self._build_request( + "PUT", + url_name, + path_params, + query_params, + body=body, + ) try: - conn.request( - **self._build_request( - "PUT", - url_name, - path_params, - query_params, - body=body, - ) - ) + conn.request(**request) except (OSError, HTTPException) as e: raise InternalAPIException(e) response = conn.getresponse() + self._store_response_cookies(response, request["url"]) return response.status, response.read() def _http_delete( @@ -288,19 +290,19 @@ def _http_delete( ) -> Tuple: with Connection(self._connection_info) as conn: + request = self._build_request( + "DELETE", + url_name, + path_params, + query_params, + body=body, + ) try: - conn.request( - **self._build_request( - "DELETE", - url_name, - path_params, - query_params, - body=body, - ) - ) + conn.request(**request) except (OSError, HTTPException) as e: raise InternalAPIException(e) response = conn.getresponse() + self._store_response_cookies(response, request["url"]) return response.status, response.read() def _http_stream( @@ -311,19 +313,20 @@ def _http_stream( query_params: Optional[dict] = None, ) -> HTTPResponse: conn = Connection(self._connection_info).connect() + request = self._build_request( + "GET", + url_name, + path_params, + query_params, + body=body, + ) try: - conn.request( - **self._build_request( - "GET", - url_name, - path_params, - query_params, - body=body, - ) - ) + conn.request(**request) except (OSError, HTTPException) as e: raise InternalAPIException(e) - return conn.getresponse() + response = conn.getresponse() + self._store_response_cookies(response, request["url"]) + return response def _make_body_params(self, data: Union[dict, str]) -> str: return data if isinstance(data, str) else json.dumps(data) @@ -349,3 +352,25 @@ def _initialize_api_client_credentials(self, username: str, password: str): raise InternalAPIException("api client credentials are not valid") self._api_client_id = username self._api_client_password = password + + def _store_response_cookies( + self, response: HTTPResponse, request_path: str + ) -> None: + if not self._cookie_jar: + return + + headers = [] + if getattr(response, "headers", None): + headers = response.headers.get_all("Set-Cookie") or [] + if not headers and getattr(response, "msg", None): + headers = response.msg.get_all("Set-Cookie") or [] + if not headers: + header = response.getheader("Set-Cookie") + headers = [header] if header else [] + + if headers: + self._cookie_jar.store( + headers, + self._connection_info["host"], + request_path, + ) diff --git a/privx_api/cookie_jar.py b/privx_api/cookie_jar.py new file mode 100644 index 0000000..4f28ae6 --- /dev/null +++ b/privx_api/cookie_jar.py @@ -0,0 +1,147 @@ +import time +from datetime import datetime, timezone +from http import cookies +from typing import Iterable, Optional + + +class RoutingCookieJar: + """ + Minimal cookie jar to persist node-affinity cookies between requests. + """ + + def __init__(self) -> None: + # (domain, path, name) -> {"value": str, "expires": Optional[float]} + self._cookies = {} + + def store( + self, set_cookie_headers: Iterable[str], host: str, request_path: str + ) -> None: + if not set_cookie_headers: + return + # cur query params + request_path = request_path.split("?", 1)[0] or "/" + default_path = self._default_path(request_path) + for header in set_cookie_headers: + simple = cookies.SimpleCookie() + try: + simple.load(header) + except (cookies.CookieError, KeyError): + continue + for morsel in simple.values(): + domain = self._normalize_domain(morsel["domain"], host) + path = morsel["path"] or default_path + expires = self._parse_expiry(morsel) + key = (domain, path, morsel.key) + self._cookies[key] = {"value": morsel.value, "expires": expires} + + def get_header(self, host: str, request_path: str) -> Optional[str]: + if not self._cookies: + return None + + now = time.time() + request_path = request_path.split("?", 1)[0] or "/" + pairs = [] + expired = [] + for (domain, path, name), meta in self._cookies.items(): + expires = meta["expires"] + if expires is not None and expires <= now: + expired.append((domain, path, name)) + continue + if not self._domain_matches(host, domain): + continue + if not self._path_matches(request_path, path): + continue + pairs.append(f"{name}={meta['value']}") + + for key in expired: + self._cookies.pop(key, None) + + return "; ".join(pairs) if pairs else None + + @staticmethod + def _default_path(request_path: str) -> str: + # https://www.rfc-editor.org/rfc/rfc6265#section-5.1.4 defines how default path + # should be created + request_path = request_path or "/" + if not request_path.startswith("/"): + return "/" + if request_path == "/": + return "/" + slash_index = request_path.rfind("/") + if slash_index <= 0: + return "/" + return request_path[:slash_index] + + @staticmethod + def _normalize_domain(cookie_domain: str, host: str) -> str: + if cookie_domain: + return cookie_domain.lstrip(".").lower() + return (host or "").lower() + + @staticmethod + def _parse_expiry(morsel: cookies.Morsel) -> Optional[float]: + max_age = morsel["max-age"] + if max_age: + try: + return time.time() + int(max_age) + except ValueError: + return None + expires = morsel["expires"] + if expires: + return RoutingCookieJar._parse_http_date(expires) + + return None + + @staticmethod + def _parse_http_date(date_str: str) -> Optional[float]: + date_str = (date_str or "").strip() + if not date_str: + return None + formats = ( + ("%a, %d %b %Y %H:%M:%S GMT", False), # IMF-fixdate (RFC 7231) + ("%a, %d-%b-%Y %H:%M:%S GMT", False), # RFC 850 but 4-digit year + ("%a, %d-%b-%y %H:%M:%S GMT", True), # RFC 850 two-digit year + ("%A, %d-%b-%y %H:%M:%S GMT", True), # RFC 850 weekday spelled out + ) + for fmt, needs_adjust in formats: + try: + parsed = datetime.strptime(date_str, fmt) + except (TypeError, ValueError): + continue + if needs_adjust: + parsed = RoutingCookieJar._adjust_two_digit_year(parsed) + parsed = parsed.replace(tzinfo=timezone.utc) + return parsed.timestamp() + return None + + @staticmethod + def _adjust_two_digit_year(dt: datetime) -> datetime: + year = dt.year + if year < 100: + if year < 70: + year += 2000 + else: + year += 1900 + return dt.replace(year=year) + + @staticmethod + def _domain_matches(host: str, domain: str) -> bool: + host = (host or "").lower() + domain = domain or "" + if not host or not domain: + return False + return host == domain or host.endswith(f".{domain}") + + @staticmethod + def _path_matches(request_path: str, cookie_path: str) -> bool: + request_path = request_path or "/" + if not request_path.startswith("/"): + request_path = "/" + request_path + normalized = (cookie_path or "/").rstrip("/") or "/" + if normalized == "/": + return True + if not request_path.startswith(normalized): + return False + if len(request_path) == len(normalized): + return True + return request_path[len(normalized)] == "/" diff --git a/privx_api/enums.py b/privx_api/enums.py index d784837..cc07dcd 100644 --- a/privx_api/enums.py +++ b/privx_api/enums.py @@ -51,6 +51,8 @@ class HostStoreEnum: RESOLVE = "HOST_STORE.RESOLVE" SEARCH = "HOST_STORE.SEARCH" SETTINGS = "HOST_STORE.SETTINGS" + SESSION_HOST_CERT = "HOST_STORE.SESSION_HOST_CERT" + SESSION_HOST_CERTS = "HOST_STORE.SESSION_HOST_CERTS" STATUS = "HOST_STORE.STATUS" TAGS = "HOST_STORE.TAGS" WHITELISTS = "HOST_STORE.WHITELISTS" @@ -68,6 +70,10 @@ class HostStoreEnum: RESOLVE: "/host-store/api/v1/hosts/resolve", SEARCH: "/host-store/api/v1/hosts/search", SETTINGS: "/host-store/api/v1/settings/default_service_options", + SESSION_HOST_CERT: "/host-store/api/v1/hosts/{host_id}/" + "session_host_certificates/{certificate_id}", + SESSION_HOST_CERTS: "/host-store/api/v1/hosts/{host_id}/" + "session_host_certificates", STATUS: "/host-store/api/v1/status", TAGS: "/host-store/api/v1/hosts/tags", WHITELISTS: "/host-store/api/v1/whitelists", @@ -263,6 +269,41 @@ class DbProxyEnum: urls = {STATUS: "/db-proxy/api/v1/status", CONF: "/db-proxy/api/v1/conf"} +class ApiProxyEnum: + STATUS = "API_PROXY.STATUS" + CONFIGURATION = "API_PROXY.CONFIGURATION" + API_TARGETS = "API_PROXY.API_TARGETS" + API_TARGET = "API_PROXY.API_TARGET" + API_TARGET_SEARCH = "API_PROXY.API_TARGET_SEARCH" + API_TARGET_TAGS = "API_PROXY.API_TARGET_TAGS" + CURRENT_CLIENT_CREDENTIALS = "API_PROXY.CURRENT_CLIENT_CREDENTIALS" + CURRENT_CLIENT_CREDENTIAL = "API_PROXY.CURRENT_CLIENT_CREDENTIAL" + CURRENT_CLIENT_CREDENTIAL_SECRET = "API_PROXY.CURRENT_CLIENT_CREDENTIAL_SECRET" + USER_CLIENT_CREDENTIALS = "API_PROXY.USER_CLIENT_CREDENTIALS" + USER_CLIENT_CREDENTIAL = "API_PROXY.USER_CLIENT_CREDENTIAL" + USER_CLIENT_CREDENTIAL_SECRET = "API_PROXY.USER_CLIENT_CREDENTIAL_SECRET" + + urls = { + STATUS: "/api-proxy/api/v1/status", + CONFIGURATION: "/api-proxy/api/v1/conf", + API_TARGETS: "/api-proxy/api/v1/api-targets", + API_TARGET: "/api-proxy/api/v1/api-targets/{api_target_id}", + API_TARGET_SEARCH: "/api-proxy/api/v1/api-targets/search", + API_TARGET_TAGS: "/api-proxy/api/v1/api-targets/tags", + CURRENT_CLIENT_CREDENTIALS: "/api-proxy/api/v1/users/current/" + "client-credentials", + CURRENT_CLIENT_CREDENTIAL: "/api-proxy/api/v1/users/current/" + "client-credentials/{credential_id}", + CURRENT_CLIENT_CREDENTIAL_SECRET: "/api-proxy/api/v1/users/" + "current/client-credentials/{credential_id}/secret", + USER_CLIENT_CREDENTIALS: "/api-proxy/api/v1/users/{user_id}/client-credentials", + USER_CLIENT_CREDENTIAL: "/api-proxy/api/v1/users/{user_id}/" + "client-credentials/{credential_id}", + USER_CLIENT_CREDENTIAL_SECRET: "/api-proxy/api/v1/users/{user_id}/" + "client-credentials/{credential_id}/secret", + } + + class VaultEnum: METADATA = "VAULT.METADATA" SCHEMAS = "VAULT.SCHEMAS" @@ -591,6 +632,7 @@ class SecretsManagerEnum: class UrlEnum: AUTH = AuthEnum AUTHORIZER = AuthorizerEnum + API_PROXY = ApiProxyEnum CONNECTION_MANAGER = ConnectionManagerEnum DB_PROXY = DbProxyEnum HOST_STORE = HostStoreEnum diff --git a/privx_api/host_store.py b/privx_api/host_store.py index b67d6a0..0fde883 100644 --- a/privx_api/host_store.py +++ b/privx_api/host_store.py @@ -350,3 +350,64 @@ def eval_commands_against_whitelist( }, ) return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + + def get_session_host_certificates( + self, + host_id: str, + offset: Optional[int] = None, + limit: Optional[int] = None, + sort_key: Optional[str] = None, + sort_dir: Optional[str] = None, + ) -> PrivXAPIResponse: + """ + Get session host certificates. + + Returns: + PrivXAPIResponse + """ + + search_params = self._get_search_params( + offset=offset, + limit=limit, + sortkey=sort_key, + sortdir=sort_dir, + ) + + response_status, data = self._http_get( + UrlEnum.HOST_STORE.SESSION_HOST_CERTS, + path_params={"host_id": host_id}, + query_params=search_params, + ) + return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + + def delete_session_host_certificates(self, host_id: str) -> PrivXAPIResponse: + """ + Delete host's session hosts certificates. + + Returns: + PrivXAPIResponse + """ + + response_status, data = self._http_delete( + UrlEnum.HOST_STORE.SESSION_HOST_CERTS, + path_params={"host_id": host_id}, + ) + return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + + def delete_session_host_certificate( + self, + host_id: str, + certificate_id: str, + ) -> PrivXAPIResponse: + """ + Delete a session host's certificate. + + Returns: + PrivXAPIResponse + """ + + response_status, data = self._http_delete( + UrlEnum.HOST_STORE.SESSION_HOST_CERT, + query_params={"host_id": host_id, "certificate_id": certificate_id}, + ) + return PrivXAPIResponse(response_status, HTTPStatus.OK, data) diff --git a/privx_api/privx_api.py b/privx_api/privx_api.py index c73c3f7..203ce41 100644 --- a/privx_api/privx_api.py +++ b/privx_api/privx_api.py @@ -16,6 +16,7 @@ # # PrivX API lib response. # +from privx_api.api_proxy import ApiProxyAPI from privx_api.auth import AuthAPI from privx_api.authorizer import AuthorizerAPI from privx_api.connection_manager import ConnectionManagerAPI @@ -34,6 +35,7 @@ class PrivXAPI( + ApiProxyAPI, AuthAPI, AuthorizerAPI, ConnectionManagerAPI, diff --git a/privx_api/tests/test_cookie_jar.py b/privx_api/tests/test_cookie_jar.py new file mode 100644 index 0000000..164ee39 --- /dev/null +++ b/privx_api/tests/test_cookie_jar.py @@ -0,0 +1,117 @@ +import pytest + +from privx_api.cookie_jar import RoutingCookieJar + + +def test_store_and_get_header_respects_domain_and_path(): + jar = RoutingCookieJar() + jar.store( + [ + "AWSALB=node-a; Path=/auth; Domain=.example.com", + "ROUTE=node-b; Path=/api/v2", + ], + host="privx.example.com", + request_path="/auth/login", + ) + + assert ( + jar.get_header("privx.example.com", "/auth/session") == "AWSALB=node-a" + ), "cookie with matching domain/path should be returned" + assert ( + jar.get_header("unrelated.com", "/auth/session") is None + ), "non-matching domain should not receive cookies" + assert ( + jar.get_header("privx.example.com", "/api/v2/hosts") == "ROUTE=node-b" + ), "second cookie should be scoped to its own path" + + +def test_store_without_path_uses_default_path(): + jar = RoutingCookieJar() + jar.store( + ["stick=node-1"], + host="api.example.com", + request_path="/api/v1/tokens?foo=bar", + ) + + assert jar.get_header("api.example.com", "/api/v1/projects") == "stick=node-1" + assert jar.get_header("api.example.com", "/status") is None + + +def test_get_header_drops_expired_cookies(monkeypatch): + jar = RoutingCookieJar() + now = {"value": 100} + + def fake_time(): + return now["value"] + + monkeypatch.setattr("privx_api.cookie_jar.time.time", fake_time) + + jar.store(["ROUTE=abc; Max-Age=1"], host="api.example.com", request_path="/") + now["value"] = 200 + + assert jar.get_header("api.example.com", "/") is None + assert jar._cookies == {} + + +@pytest.mark.parametrize( + "date_str, expected", + [ + ("Wed, 17 Dec 2025 18:55:59 GMT", 1765997759.0), + ("Wed, 17-Dec-2025 18:55:59 GMT", 1765997759.0), + ("Wednesday, 17-Dec-25 18:55:59 GMT", 1765997759.0), + ("Wed, 17-Dec-25 18:55:59 GMT", 1765997759.0), + ], +) +def test_parse_http_date_valid(date_str, expected): + assert RoutingCookieJar._parse_http_date(date_str) == expected + + +def test_parse_http_date_invalid(): + assert RoutingCookieJar._parse_http_date("Wed, 17 Dec 2025 18:55:59 PST") is None + assert RoutingCookieJar._parse_http_date("") is None + + +def test_store_overwrites_existing_cookie(monkeypatch): + jar = RoutingCookieJar() + now = {"value": 100} + + def fake_time(): + return now["value"] + + monkeypatch.setattr("privx_api.cookie_jar.time.time", fake_time) + + jar.store(["ROUTE=old; Max-Age=5"], host="api.example.com", request_path="/api") + assert jar.get_header("api.example.com", "/api") == "ROUTE=old" + + now["value"] = 102 + jar.store(["ROUTE=new; Max-Age=5"], host="api.example.com", request_path="/api") + assert jar.get_header("api.example.com", "/api") == "ROUTE=new" + + +@pytest.mark.parametrize( + "request_path, expected", + [ + ("/api/v1/resource", "/api/v1"), + ("/api", "/"), + ("api", "/"), + ("/", "/"), + ("", "/"), + ], +) +def test_default_path(request_path, expected): + assert RoutingCookieJar._default_path(request_path) == expected + + +@pytest.mark.parametrize( + "request_path, cookie_path, expected", + [ + ("/", "/", True), + ("/foo/bar", "/foo", True), + ("/foo", "/foo", True), + ("/foo", "/foo/bar", False), + ("/foobar", "/foo", False), + ("foo/bar", "/foo", True), + ], +) +def test_path_matches(request_path, cookie_path, expected): + assert RoutingCookieJar._path_matches(request_path, cookie_path) is expected diff --git a/setup.py b/setup.py index 39ac13b..cd7919f 100644 --- a/setup.py +++ b/setup.py @@ -4,7 +4,7 @@ setup( name="privx_api", - version="41.0.0", + version="42.0.0", packages=["privx_api"], license="Apache Licence 2.0", url="https://github.com/SSHcom/privx-sdk-for-python",