diff --git a/backend/app.py b/backend/app.py index 0f1e0f3..c14ebc4 100644 --- a/backend/app.py +++ b/backend/app.py @@ -57,6 +57,7 @@ from routers.mpls import mpls as mpls_router from routers.ipsec import ipsec as ipsec_router from routers.l2tp import l2tp as l2tp_router +from routers.pki import pki as pki_router from routers import version as version_router # Global variables @@ -302,6 +303,7 @@ async def get_permissions(request: Request) -> dict: app.include_router(mpls_router.router) app.include_router(ipsec_router.router) app.include_router(l2tp_router.router) +app.include_router(pki_router.router) app.include_router(version_router.router) diff --git a/backend/rbac_permissions.py b/backend/rbac_permissions.py index 51680d2..563c914 100644 --- a/backend/rbac_permissions.py +++ b/backend/rbac_permissions.py @@ -50,6 +50,9 @@ class FeatureGroup(str, Enum): WIREGUARD = "WIREGUARD" L2TP = "L2TP" + # PKI + PKI = "PKI" + # Routing features (parent/child hierarchy) ROUTING = "ROUTING" UNICAST_PROTOCOLS = "UNICAST_PROTOCOLS" @@ -141,6 +144,7 @@ class BuiltInRole(str, Enum): FeatureGroup.IPSEC: PermissionLevel.WRITE, FeatureGroup.WIREGUARD: PermissionLevel.WRITE, FeatureGroup.L2TP: PermissionLevel.WRITE, + FeatureGroup.PKI: PermissionLevel.WRITE, FeatureGroup.ROUTING: PermissionLevel.WRITE, FeatureGroup.UNICAST_PROTOCOLS: PermissionLevel.WRITE, FeatureGroup.BGP: PermissionLevel.WRITE, @@ -203,6 +207,7 @@ class BuiltInRole(str, Enum): FeatureGroup.IPSEC: PermissionLevel.WRITE, FeatureGroup.WIREGUARD: PermissionLevel.WRITE, FeatureGroup.L2TP: PermissionLevel.WRITE, + FeatureGroup.PKI: PermissionLevel.WRITE, FeatureGroup.ROUTING: PermissionLevel.WRITE, FeatureGroup.UNICAST_PROTOCOLS: PermissionLevel.WRITE, FeatureGroup.BGP: PermissionLevel.WRITE, @@ -266,6 +271,7 @@ class BuiltInRole(str, Enum): FeatureGroup.IPSEC: PermissionLevel.READ, FeatureGroup.WIREGUARD: PermissionLevel.READ, FeatureGroup.L2TP: PermissionLevel.READ, + FeatureGroup.PKI: PermissionLevel.READ, FeatureGroup.ROUTING: PermissionLevel.READ, FeatureGroup.UNICAST_PROTOCOLS: PermissionLevel.READ, FeatureGroup.BGP: PermissionLevel.READ, @@ -370,6 +376,7 @@ async def get_user_permissions( FeatureGroup.IPSEC, FeatureGroup.WIREGUARD, FeatureGroup.L2TP, + FeatureGroup.PKI, FeatureGroup.ROUTING, FeatureGroup.UNICAST_PROTOCOLS, FeatureGroup.BGP, @@ -452,6 +459,7 @@ async def get_user_permissions( FeatureGroup.IPSEC, FeatureGroup.WIREGUARD, FeatureGroup.L2TP, + FeatureGroup.PKI, FeatureGroup.ROUTING, # Added for three-level hierarchy FeatureGroup.UNICAST_PROTOCOLS, # Added for three-level hierarchy FeatureGroup.BGP, diff --git a/backend/routers/pki/__init__.py b/backend/routers/pki/__init__.py new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/backend/routers/pki/__init__.py @@ -0,0 +1 @@ + diff --git a/backend/routers/pki/pki.py b/backend/routers/pki/pki.py new file mode 100644 index 0000000..f400680 --- /dev/null +++ b/backend/routers/pki/pki.py @@ -0,0 +1,978 @@ +""" +PKI (Public Key Infrastructure) Router + +API endpoints for managing VyOS PKI configuration including: +- Certificate Authorities (CA) +- Certificates (including ACME/Let's Encrypt) +- Diffie-Hellman parameters +- Key Pairs +- OpenSSH keys +- OpenVPN shared secrets +- X509 defaults + +Uses session-based architecture - VyOS instance comes from user's active session. +""" + +from fastapi import APIRouter, HTTPException, Request +from pydantic import BaseModel, Field +from typing import List, Dict, Optional, Any, Literal +from session_vyos_service import get_session_vyos_service +from vyos_builders.pki import PKIBatchBuilder +from vyos_mappers.pki import PKIMapper +from fastapi_permissions import require_read_permission, require_write_permission +from rbac_permissions import FeatureGroup +import inspect +import logging +import datetime + +logger = logging.getLogger(__name__) + +router = APIRouter(prefix="/vyos/pki", tags=["pki"]) + +# Builder infrastructure methods that must never be invokable via the batch API +_INTERNAL_BUILDER_METHODS = frozenset({ + "add_set", "add_delete", "get_operations", "is_empty", "clear", "operation_count", +}) + +# Operations whose values contain PEM/key data that must be newline-stripped +_PEM_VALUE_OPS = frozenset({ + "set_ca_certificate", "set_ca_private_key", "set_ca_crl", + "set_certificate_cert", "set_certificate_private_key", + "set_dh_parameters", + "set_key_pair_private_key", "set_key_pair_public_key", + "set_openssh_private_key", "set_openssh_public_key", + "set_openvpn_shared_secret_key", +}) + + +def _normalize_pem(value: str) -> str: + """Strip PEM headers/footers and newlines so VyOS gets a single-line base64 value. + + VyOS config values cannot contain newlines. If *value* looks like PEM-encoded + data, the ``-----BEGIN …-----`` / ``-----END …-----`` wrapper lines are removed + and all remaining whitespace is collapsed, yielding a continuous base64 string. + Non-PEM values are returned unchanged. + """ + import re + stripped = value.strip() + if stripped.startswith("-----BEGIN "): + # Remove header and footer lines, then collapse whitespace + stripped = re.sub(r"-----[A-Z0-9 ]+-----", "", stripped) + return stripped.replace("\n", "").replace("\r", "").replace(" ", "") + return stripped + + +# ======================================================================== +# Pydantic Models +# ======================================================================== + +class PKIBatchOperation(BaseModel): + """Single operation in a batch request.""" + op: str = Field(..., description="Operation name (e.g., create_ca, set_ca_certificate)") + value: Optional[str] = Field(None, description="Operation value") + + +class PKIBatchRequest(BaseModel): + """Batch request for PKI configuration changes.""" + item_name: str = Field(..., description="Primary item name (e.g., CA name, cert name, or placeholder for global ops)") + operations: List[PKIBatchOperation] + + +class VyOSResponse(BaseModel): + """Standard response from VyOS operations.""" + success: bool + data: Optional[Dict[str, Any]] = None + error: Optional[str] = None + + +# ======================================================================== +# Endpoint 1: Capabilities +# ======================================================================== + +@router.get("/capabilities") +async def get_pki_capabilities(request: Request): + """ + Get PKI capabilities based on device VyOS version. + + Returns feature flags indicating which operations are supported. + """ + await require_read_permission(request, FeatureGroup.PKI) + try: + service = get_session_vyos_service(request) + version = service.get_version() + + builder = PKIBatchBuilder(version=version) + return builder.get_capabilities() + except HTTPException: + raise + except Exception: + logger.exception("Unhandled error") + raise HTTPException(status_code=500, detail="Internal server error") + + +# ======================================================================== +# Endpoint 2: Config (Generalized Data) +# ======================================================================== + +@router.get("/config") +async def get_pki_config(request: Request, refresh: bool = False): + """ + Get PKI configuration from VyOS. + + Returns generalized PKI configuration data including: + - Certificate Authorities + - Certificates (with ACME info) + - Diffie-Hellman parameters + - Key Pairs + - OpenSSH keys + - OpenVPN shared secrets + - X509 defaults + """ + await require_read_permission(request, FeatureGroup.PKI) + try: + service = get_session_vyos_service(request) + version = service.get_version() + + full_config = service.get_full_config(refresh=refresh) + + mapper = PKIMapper(version) + config = mapper.parse_config(full_config) + + # Convert CA dict to list for frontend + ca_list = [] + for name, data in config.get("ca", {}).items(): + # Mask private keys + masked = {**data} + if masked.get("private_key"): + masked["private_key"] = "***" + ca_list.append(masked) + + # Convert certificates dict to list + cert_list = [] + for name, data in config.get("certificates", {}).items(): + masked = {**data} + if masked.get("private_key"): + masked["private_key"] = "***" + cert_list.append(masked) + + # Convert DH dict to list + dh_list = [] + for name, data in config.get("dh", {}).items(): + masked = {**data} + if masked.get("parameters"): + masked["parameters"] = "***" + dh_list.append(masked) + + # Convert key pairs dict to list + key_pair_list = [] + for name, data in config.get("key_pairs", {}).items(): + masked = {**data} + if masked.get("private_key"): + masked["private_key"] = "***" + key_pair_list.append(masked) + + # Convert OpenSSH dict to list + openssh_list = [] + for name, data in config.get("openssh", {}).items(): + masked = {**data} + if masked.get("private_key"): + masked["private_key"] = "***" + openssh_list.append(masked) + + # Convert OpenVPN shared secrets dict to list + openvpn_list = [] + for name, data in config.get("openvpn_shared_secrets", {}).items(): + masked = {**data} + if masked.get("key"): + masked["key"] = "***" + openvpn_list.append(masked) + + return { + "configured": config.get("configured", False), + "ca": ca_list, + "certificates": cert_list, + "dh": dh_list, + "key_pairs": key_pair_list, + "openssh": openssh_list, + "openvpn_shared_secrets": openvpn_list, + "x509_defaults": config.get("x509_defaults", {}), + "totals": { + "ca": len(ca_list), + "certificates": len(cert_list), + "dh": len(dh_list), + "key_pairs": len(key_pair_list), + "openssh": len(openssh_list), + "openvpn_shared_secrets": len(openvpn_list), + }, + } + except HTTPException: + raise + except Exception: + logger.exception("Unhandled error") + raise HTTPException(status_code=500, detail="Internal server error") + + +# ======================================================================== +# Endpoint: Reveal (unmasked value for a specific PKI item field) +# ======================================================================== + +class PKIRevealRequest(BaseModel): + """Request to reveal an unmasked PKI value.""" + item_type: str = Field(..., description="PKI item type: ca, certificate, dh, key_pair, openssh, openvpn") + item_name: str = Field(..., description="Item name") + field: str = Field(..., description="Field to reveal: certificate, private_key, public_key, parameters, key, crl") + + +@router.post("/reveal") +async def reveal_pki_value(http_request: Request, request: PKIRevealRequest): + """ + Reveal an unmasked PKI value for viewing/copying. + + Requires WRITE permission since this exposes sensitive material. + """ + await require_write_permission(http_request, FeatureGroup.PKI) + try: + service = get_session_vyos_service(http_request) + version = service.get_version() + full_config = service.get_full_config(refresh=False) + + mapper = PKIMapper(version) + config = mapper.parse_config(full_config) + + # Map item_type to config section + type_map = { + "ca": "ca", + "certificate": "certificates", + "dh": "dh", + "key_pair": "key_pairs", + "openssh": "openssh", + "openvpn": "openvpn_shared_secrets", + } + + section = type_map.get(request.item_type) + if not section: + raise HTTPException(status_code=400, detail=f"Invalid item type: {request.item_type}") + + items = config.get(section, {}) + item = items.get(request.item_name) + if not item: + raise HTTPException(status_code=404, detail=f"Item '{request.item_name}' not found") + + # Allowed fields per type + allowed_fields = { + "ca": ["certificate", "private_key", "crl"], + "certificate": ["certificate", "private_key"], + "dh": ["parameters"], + "key_pair": ["private_key", "public_key"], + "openssh": ["private_key", "public_key"], + "openvpn": ["key"], + } + + if request.field not in allowed_fields.get(request.item_type, []): + raise HTTPException(status_code=400, detail=f"Invalid field '{request.field}' for type '{request.item_type}'") + + value = item.get(request.field) + + return {"value": value} + except HTTPException: + raise + except Exception: + logger.exception("Unhandled error revealing PKI value") + raise HTTPException(status_code=500, detail="Internal server error") + + +# ======================================================================== +# Endpoint 3: Batch Operations +# ======================================================================== + +@router.post("/batch", response_model=VyOSResponse) +async def pki_batch_configure(http_request: Request, request: PKIBatchRequest): + """ + Execute a batch of PKI configuration operations. + + All operations are executed in a single VyOS commit for atomicity. + + The item_name field serves as the primary identifier for the operations + (e.g., CA name for CA ops, cert name for cert ops, or a + placeholder like "pki" for global settings like X509 defaults). + + Each operation's `op` field maps to a method on PKIBatchBuilder. + The `value` field provides additional parameters when needed. + """ + await require_write_permission(http_request, FeatureGroup.PKI) + try: + service = get_session_vyos_service(http_request) + version = service.get_version() + + builder = PKIBatchBuilder(version=version) + + for operation in request.operations: + if operation.op.startswith("_") or operation.op in _INTERNAL_BUILDER_METHODS: + raise HTTPException(status_code=400, detail=f"Invalid operation: {operation.op}") + + method = getattr(builder, operation.op, None) + if not callable(method): + raise HTTPException( + status_code=400, + detail=f"Unknown operation: {operation.op}" + ) + + sig = inspect.signature(method) + params = [p for p in sig.parameters.keys() if p != "self"] + + # Normalize PEM values (strip headers/newlines) for cert/key ops + op_value = operation.value + if op_value is not None and operation.op in _PEM_VALUE_OPS: + op_value = _normalize_pem(op_value) + + args = [] + if len(params) >= 1: + args.append(request.item_name) + if len(params) >= 2 and op_value is not None: + if len(params) >= 3: + parts = op_value.split("|", len(params) - 2) + args.extend(parts) + else: + args.append(op_value) + + method(*args) + + if builder.is_empty(): + return VyOSResponse( + success=True, + data={"message": "No operations to execute"}, + ) + + response = service.execute_batch(builder) + + return VyOSResponse( + success=response.status == 200, + data={"message": "PKI configuration updated"}, + error=response.error if response.error else None, + ) + except HTTPException: + raise + except Exception: + logger.exception("Unhandled error") + raise HTTPException(status_code=500, detail="Internal server error") + + +# ======================================================================== +# Endpoint 4: Generate CA +# ======================================================================== + +class GenerateCARequest(BaseModel): + """Request to generate a self-signed Certificate Authority.""" + name: str = Field(..., description="CA name") + key_type: Literal["rsa", "ec"] = Field("rsa", description="Key type: rsa or ec") + key_size: int = Field(2048, description="Key size in bits (RSA: 2048/3072/4096, EC: 256/384/521)") + country: Optional[str] = Field(None, description="Country code (2 letters)") + state: Optional[str] = Field(None, description="State or province") + locality: Optional[str] = Field(None, description="Locality or city") + organization: Optional[str] = Field(None, description="Organization name") + common_name: str = Field(..., description="Common Name (CN)") + days: int = Field(3650, description="Validity period in days") + encrypt_key: bool = Field(False, description="Encrypt the private key with a passphrase") + passphrase: Optional[str] = Field(None, description="Passphrase for key encryption (required if encrypt_key is True)") + revoke: bool = Field(False, description="Mark as revoked") + system_install: bool = Field(False, description="Install to system trust store") + + +@router.post("/generate-ca", response_model=VyOSResponse) +async def generate_ca(http_request: Request, request: GenerateCARequest): + """ + Generate a self-signed CA certificate and install it on the device. + + Uses Python's cryptography library to generate the CA cert+key, + then installs via the normal batch config API. + """ + await require_write_permission(http_request, FeatureGroup.PKI) + try: + from cryptography import x509 + from cryptography.x509.oid import NameOID + from cryptography.hazmat.primitives import hashes, serialization + from cryptography.hazmat.primitives.asymmetric import rsa, ec + + # Validate + if request.encrypt_key and not request.passphrase: + raise HTTPException(status_code=400, detail="Passphrase required when encrypt_key is True") + + if request.key_type == "rsa" and request.key_size not in (2048, 3072, 4096): + raise HTTPException(status_code=400, detail="RSA key size must be 2048, 3072, or 4096") + + if request.key_type == "ec" and request.key_size not in (256, 384, 521): + raise HTTPException(status_code=400, detail="EC key size must be 256, 384, or 521") + + # Generate private key + if request.key_type == "rsa": + private_key = rsa.generate_private_key( + public_exponent=65537, + key_size=request.key_size, + ) + else: + curve_map = {256: ec.SECP256R1(), 384: ec.SECP384R1(), 521: ec.SECP521R1()} + private_key = ec.generate_private_key(curve_map[request.key_size]) + + # Build subject + name_attrs = [] + if request.country: + name_attrs.append(x509.NameAttribute(NameOID.COUNTRY_NAME, request.country[:2])) + if request.state: + name_attrs.append(x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, request.state)) + if request.locality: + name_attrs.append(x509.NameAttribute(NameOID.LOCALITY_NAME, request.locality)) + if request.organization: + name_attrs.append(x509.NameAttribute(NameOID.ORGANIZATION_NAME, request.organization)) + name_attrs.append(x509.NameAttribute(NameOID.COMMON_NAME, request.common_name)) + subject = issuer = x509.Name(name_attrs) + + # Build certificate + now = datetime.datetime.now(datetime.timezone.utc) + cert = ( + x509.CertificateBuilder() + .subject_name(subject) + .issuer_name(issuer) + .public_key(private_key.public_key()) + .serial_number(x509.random_serial_number()) + .not_valid_before(now) + .not_valid_after(now + datetime.timedelta(days=request.days)) + .add_extension(x509.BasicConstraints(ca=True, path_length=None), critical=True) + .add_extension( + x509.KeyUsage( + digital_signature=True, key_cert_sign=True, crl_sign=True, + content_commitment=False, key_encipherment=False, + data_encipherment=False, key_agreement=False, + encipher_only=False, decipher_only=False, + ), + critical=True, + ) + .add_extension( + x509.SubjectKeyIdentifier.from_public_key(private_key.public_key()), + critical=False, + ) + .sign(private_key, hashes.SHA256()) + ) + + # Serialize and strip PEM headers/newlines for VyOS config + cert_pem = _normalize_pem( + cert.public_bytes(serialization.Encoding.PEM).decode("utf-8") + ) + + if request.encrypt_key: + key_pem = _normalize_pem(private_key.private_bytes( + serialization.Encoding.PEM, + serialization.PrivateFormat.PKCS8, + serialization.BestAvailableEncryption(request.passphrase.encode()), + ).decode("utf-8")) + else: + key_pem = _normalize_pem(private_key.private_bytes( + serialization.Encoding.PEM, + serialization.PrivateFormat.PKCS8, + serialization.NoEncryption(), + ).decode("utf-8")) + + # Install on device via batch operations + service = get_session_vyos_service(http_request) + version = service.get_version() + builder = PKIBatchBuilder(version=version) + + builder.create_ca(request.name) + builder.set_ca_certificate(request.name, cert_pem) + builder.set_ca_private_key(request.name, key_pem) + + if request.encrypt_key: + builder.set_ca_private_password_protected(request.name) + if request.revoke: + builder.set_ca_revoke(request.name) + if request.system_install: + builder.set_ca_system_install(request.name) + + response = service.execute_batch(builder) + + return VyOSResponse( + success=response.status == 200, + data={"message": f"CA '{request.name}' generated and installed"}, + error=response.error if response.error else None, + ) + except HTTPException: + raise + except Exception: + logger.exception("Unhandled error generating CA") + raise HTTPException(status_code=500, detail="Internal server error") + + +# ======================================================================== +# Endpoint 5: Generate Key Pair +# ======================================================================== + +class GenerateKeyPairRequest(BaseModel): + """Request to generate a public/private key pair.""" + name: str = Field(..., description="Key pair name") + key_type: Literal["rsa", "ec"] = Field("rsa", description="Key type: rsa or ec") + key_size: int = Field(2048, description="Key size (RSA: 2048/3072/4096, EC: 256/384/521)") + encrypt_key: bool = Field(False, description="Encrypt the private key") + passphrase: Optional[str] = Field(None, description="Passphrase (required if encrypt_key is True)") + + +@router.post("/generate-key-pair", response_model=VyOSResponse) +async def generate_key_pair(http_request: Request, request: GenerateKeyPairRequest): + """Generate a key pair and install it on the device.""" + await require_write_permission(http_request, FeatureGroup.PKI) + try: + from cryptography.hazmat.primitives import serialization + from cryptography.hazmat.primitives.asymmetric import rsa, ec + + if request.encrypt_key and not request.passphrase: + raise HTTPException(status_code=400, detail="Passphrase required when encrypt_key is True") + if request.key_type == "rsa" and request.key_size not in (2048, 3072, 4096): + raise HTTPException(status_code=400, detail="RSA key size must be 2048, 3072, or 4096") + if request.key_type == "ec" and request.key_size not in (256, 384, 521): + raise HTTPException(status_code=400, detail="EC key size must be 256, 384, or 521") + + # Generate private key + if request.key_type == "rsa": + private_key = rsa.generate_private_key(public_exponent=65537, key_size=request.key_size) + else: + curve_map = {256: ec.SECP256R1(), 384: ec.SECP384R1(), 521: ec.SECP521R1()} + private_key = ec.generate_private_key(curve_map[request.key_size]) + + # Serialize private key + if request.encrypt_key: + priv_pem = _normalize_pem(private_key.private_bytes( + serialization.Encoding.PEM, + serialization.PrivateFormat.PKCS8, + serialization.BestAvailableEncryption(request.passphrase.encode()), + ).decode("utf-8")) + else: + priv_pem = _normalize_pem(private_key.private_bytes( + serialization.Encoding.PEM, + serialization.PrivateFormat.PKCS8, + serialization.NoEncryption(), + ).decode("utf-8")) + + # Serialize public key + pub_pem = _normalize_pem(private_key.public_key().public_bytes( + serialization.Encoding.PEM, + serialization.PublicFormat.SubjectPublicKeyInfo, + ).decode("utf-8")) + + # Install on device + service = get_session_vyos_service(http_request) + version = service.get_version() + builder = PKIBatchBuilder(version=version) + + builder.create_key_pair(request.name) + builder.set_key_pair_private_key(request.name, priv_pem) + builder.set_key_pair_public_key(request.name, pub_pem) + if request.encrypt_key: + builder.set_key_pair_private_password_protected(request.name) + + response = service.execute_batch(builder) + + return VyOSResponse( + success=response.status == 200, + data={"message": f"Key pair '{request.name}' generated and installed"}, + error=response.error if response.error else None, + ) + except HTTPException: + raise + except Exception: + logger.exception("Unhandled error generating key pair") + raise HTTPException(status_code=500, detail="Internal server error") + + +# ======================================================================== +# Endpoint 6: Generate DH Parameters +# ======================================================================== + +class GenerateDHRequest(BaseModel): + """Request to generate Diffie-Hellman parameters.""" + name: str = Field(..., description="DH parameters name") + key_size: int = Field(2048, description="Key size in bits (2048, 3072, or 4096)") + + +@router.post("/generate-dh", response_model=VyOSResponse) +async def generate_dh(http_request: Request, request: GenerateDHRequest): + """ + Generate DH parameters and install them on the device. + + Note: DH parameter generation is computationally expensive and may take + 30 seconds or more depending on key size. + """ + await require_write_permission(http_request, FeatureGroup.PKI) + try: + from cryptography.hazmat.primitives.asymmetric import dh + from cryptography.hazmat.primitives import serialization + + if request.key_size not in (2048, 3072, 4096): + raise HTTPException(status_code=400, detail="DH key size must be 2048, 3072, or 4096") + + # Generate DH parameters (this is CPU-intensive) + parameters = dh.generate_parameters(generator=2, key_size=request.key_size) + + # Serialize to PEM and normalize + params_pem = _normalize_pem(parameters.parameter_bytes( + serialization.Encoding.PEM, + serialization.ParameterFormat.PKCS3, + ).decode("utf-8")) + + # Install on device + service = get_session_vyos_service(http_request) + version = service.get_version() + builder = PKIBatchBuilder(version=version) + + builder.create_dh(request.name) + builder.set_dh_parameters(request.name, params_pem) + + response = service.execute_batch(builder) + + return VyOSResponse( + success=response.status == 200, + data={"message": f"DH parameters '{request.name}' generated and installed"}, + error=response.error if response.error else None, + ) + except HTTPException: + raise + except Exception: + logger.exception("Unhandled error generating DH parameters") + raise HTTPException(status_code=500, detail="Internal server error") + + +# ======================================================================== +# Endpoint 7: Generate Certificate (signed by a CA) +# ======================================================================== + +class GenerateCertificateRequest(BaseModel): + """Request to generate a certificate signed by an existing CA.""" + name: str = Field(..., description="Certificate name") + ca_name: str = Field(..., description="Name of the CA to sign with") + key_type: Literal["rsa", "ec"] = Field("rsa", description="Key type: rsa or ec") + key_size: int = Field(2048, description="Key size (RSA: 2048/3072/4096, EC: 256/384/521)") + country: Optional[str] = Field(None, description="Country code (2 letters)") + state: Optional[str] = Field(None, description="State or province") + locality: Optional[str] = Field(None, description="Locality or city") + organization: Optional[str] = Field(None, description="Organization name") + common_name: str = Field(..., description="Common Name (CN)") + days: int = Field(365, description="Validity period in days") + subject_alt_names: Optional[List[str]] = Field(None, description="Subject Alternative Names (DNS names or IPs)") + encrypt_key: bool = Field(False, description="Encrypt the private key with a passphrase") + passphrase: Optional[str] = Field(None, description="Passphrase for key encryption") + + +@router.post("/generate-certificate", response_model=VyOSResponse) +async def generate_certificate(http_request: Request, request: GenerateCertificateRequest): + """ + Generate a certificate signed by an existing CA on the device. + + Reads the CA's certificate and private key from VyOS config, + generates a new key pair, builds a certificate, signs it with the CA, + and installs everything on the device. + """ + await require_write_permission(http_request, FeatureGroup.PKI) + try: + from cryptography import x509 + from cryptography.x509.oid import NameOID + from cryptography.hazmat.primitives import hashes, serialization + from cryptography.hazmat.primitives.asymmetric import rsa, ec + import ipaddress + + # Validate key params + if request.encrypt_key and not request.passphrase: + raise HTTPException(status_code=400, detail="Passphrase required when encrypt_key is True") + if request.key_type == "rsa" and request.key_size not in (2048, 3072, 4096): + raise HTTPException(status_code=400, detail="RSA key size must be 2048, 3072, or 4096") + if request.key_type == "ec" and request.key_size not in (256, 384, 521): + raise HTTPException(status_code=400, detail="EC key size must be 256, 384, or 521") + + # Get VyOS config to read CA cert and private key + service = get_session_vyos_service(http_request) + version = service.get_version() + full_config = service.get_full_config(refresh=False) + + mapper = PKIMapper(version) + config = mapper.parse_config(full_config) + + ca_data = config.get("ca", {}).get(request.ca_name) + if not ca_data: + raise HTTPException(status_code=404, detail=f"CA '{request.ca_name}' not found") + + ca_cert_raw = ca_data.get("certificate") + ca_key_raw = ca_data.get("private_key") + if not ca_cert_raw: + raise HTTPException(status_code=400, detail=f"CA '{request.ca_name}' has no certificate") + if not ca_key_raw: + raise HTTPException(status_code=400, detail=f"CA '{request.ca_name}' has no private key") + if ca_data.get("password_protected"): + raise HTTPException( + status_code=400, + detail=f"CA '{request.ca_name}' has an encrypted private key. " + "Cannot use password-protected CAs for signing.", + ) + + # VyOS stores certs/keys as single-line base64 without PEM headers. + # Reconstruct valid PEM for the cryptography library. + def _reconstruct_pem(raw: str, label: str) -> bytes: + # Insert newlines every 64 chars for valid PEM + import textwrap + b64 = raw.strip() + wrapped = "\n".join(textwrap.wrap(b64, 64)) + return f"-----BEGIN {label}-----\n{wrapped}\n-----END {label}-----\n".encode() + + ca_cert = x509.load_pem_x509_certificate(_reconstruct_pem(ca_cert_raw, "CERTIFICATE")) + ca_private_key = serialization.load_pem_private_key( + _reconstruct_pem(ca_key_raw, "PRIVATE KEY"), + password=None, + ) + + # Generate the certificate's private key + if request.key_type == "rsa": + cert_private_key = rsa.generate_private_key( + public_exponent=65537, key_size=request.key_size, + ) + else: + curve_map = {256: ec.SECP256R1(), 384: ec.SECP384R1(), 521: ec.SECP521R1()} + cert_private_key = ec.generate_private_key(curve_map[request.key_size]) + + # Build subject + name_attrs = [] + if request.country: + name_attrs.append(x509.NameAttribute(NameOID.COUNTRY_NAME, request.country[:2])) + if request.state: + name_attrs.append(x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, request.state)) + if request.locality: + name_attrs.append(x509.NameAttribute(NameOID.LOCALITY_NAME, request.locality)) + if request.organization: + name_attrs.append(x509.NameAttribute(NameOID.ORGANIZATION_NAME, request.organization)) + name_attrs.append(x509.NameAttribute(NameOID.COMMON_NAME, request.common_name)) + subject = x509.Name(name_attrs) + + # Build certificate + now = datetime.datetime.now(datetime.timezone.utc) + builder = ( + x509.CertificateBuilder() + .subject_name(subject) + .issuer_name(ca_cert.subject) + .public_key(cert_private_key.public_key()) + .serial_number(x509.random_serial_number()) + .not_valid_before(now) + .not_valid_after(now + datetime.timedelta(days=request.days)) + .add_extension(x509.BasicConstraints(ca=False, path_length=None), critical=True) + .add_extension( + x509.KeyUsage( + digital_signature=True, key_encipherment=True, + content_commitment=False, data_encipherment=False, + key_cert_sign=False, crl_sign=False, + key_agreement=False, encipher_only=False, decipher_only=False, + ), + critical=True, + ) + .add_extension( + x509.AuthorityKeyIdentifier.from_issuer_public_key(ca_private_key.public_key()), + critical=False, + ) + .add_extension( + x509.SubjectKeyIdentifier.from_public_key(cert_private_key.public_key()), + critical=False, + ) + ) + + # Add Subject Alternative Names if provided + if request.subject_alt_names: + san_entries: list = [] + for san in request.subject_alt_names: + san = san.strip() + if not san: + continue + # Try to parse as IP address first + try: + ip = ipaddress.ip_address(san) + san_entries.append(x509.IPAddress(ip)) + except ValueError: + san_entries.append(x509.DNSName(san)) + + if san_entries: + builder = builder.add_extension( + x509.SubjectAlternativeName(san_entries), + critical=False, + ) + + # Sign with CA key + cert = builder.sign(ca_private_key, hashes.SHA256()) + + # Serialize + cert_pem = _normalize_pem( + cert.public_bytes(serialization.Encoding.PEM).decode("utf-8") + ) + + if request.encrypt_key: + key_pem = _normalize_pem(cert_private_key.private_bytes( + serialization.Encoding.PEM, + serialization.PrivateFormat.PKCS8, + serialization.BestAvailableEncryption(request.passphrase.encode()), + ).decode("utf-8")) + else: + key_pem = _normalize_pem(cert_private_key.private_bytes( + serialization.Encoding.PEM, + serialization.PrivateFormat.PKCS8, + serialization.NoEncryption(), + ).decode("utf-8")) + + # Install on device via batch operations + batch_builder = PKIBatchBuilder(version=version) + + batch_builder.create_certificate(request.name) + batch_builder.set_certificate_cert(request.name, cert_pem) + batch_builder.set_certificate_private_key(request.name, key_pem) + if request.encrypt_key: + batch_builder.set_certificate_private_password_protected(request.name) + + response = service.execute_batch(batch_builder) + + return VyOSResponse( + success=response.status == 200, + data={"message": f"Certificate '{request.name}' generated and signed by CA '{request.ca_name}'"}, + error=response.error if response.error else None, + ) + except HTTPException: + raise + except Exception: + logger.exception("Unhandled error generating certificate") + raise HTTPException(status_code=500, detail="Internal server error") + + +# ======================================================================== +# Endpoint 8: Generate OpenVPN Shared Secret +# ======================================================================== + +class GenerateOpenVPNSecretRequest(BaseModel): + """Request to generate an OpenVPN shared secret.""" + name: str = Field(..., description="Shared secret name") + + +@router.post("/generate-openvpn-shared-secret", response_model=VyOSResponse) +async def generate_openvpn_shared_secret(http_request: Request, request: GenerateOpenVPNSecretRequest): + """ + Generate an OpenVPN shared secret using VyOS's built-in generate command. + + Uses: generate pki openvpn shared-secret install + This generates the secret and commits it to the config automatically. + """ + await require_write_permission(http_request, FeatureGroup.PKI) + try: + service = get_session_vyos_service(http_request) + + response = service.device.generate( + path=["pki", "openvpn", "shared-secret", "install", request.name] + ) + + if response.status != 200: + return VyOSResponse( + success=False, + error=response.error or "Failed to generate OpenVPN shared secret", + ) + + return VyOSResponse( + success=True, + data={"message": f"OpenVPN shared secret '{request.name}' generated and installed"}, + ) + except HTTPException: + raise + except Exception: + logger.exception("Unhandled error generating OpenVPN shared secret") + raise HTTPException(status_code=500, detail="Internal server error") + + +# ======================================================================== +# Endpoint 9: Generate OpenSSH Key +# ======================================================================== + +class GenerateOpenSSHRequest(BaseModel): + """Request to generate an OpenSSH key pair.""" + name: str = Field(..., description="OpenSSH key name") + key_size: int = Field(2048, description="RSA key size in bits (2048/3072/4096)") + + +@router.post("/generate-openssh", response_model=VyOSResponse) +async def generate_openssh(http_request: Request, request: GenerateOpenSSHRequest): + """ + Generate an OpenSSH RSA key pair and install it on the device. + + Generates an RSA key, serialises the private key in both OpenSSH and PKCS8 + formats, then attempts to install using the OpenSSH format first (matching + VyOS's own generate command) and falls back to PKCS8 if that fails. + """ + await require_write_permission(http_request, FeatureGroup.PKI) + try: + from cryptography.hazmat.primitives import serialization + from cryptography.hazmat.primitives.asymmetric import rsa + + if request.key_size not in (2048, 3072, 4096): + raise HTTPException(status_code=400, detail="RSA key size must be 2048, 3072, or 4096") + + # Generate RSA key (VyOS openssh only supports ssh-rsa) + private_key = rsa.generate_private_key( + public_exponent=65537, key_size=request.key_size, + ) + + # Prepare both private key formats to try + priv_openssh = _normalize_pem(private_key.private_bytes( + serialization.Encoding.PEM, + serialization.PrivateFormat.OpenSSH, + serialization.NoEncryption(), + ).decode("utf-8")) + + priv_pkcs8 = _normalize_pem(private_key.private_bytes( + serialization.Encoding.PEM, + serialization.PrivateFormat.PKCS8, + serialization.NoEncryption(), + ).decode("utf-8")) + + # Serialize public key in OpenSSH format and extract just the base64 part + pub_ssh = private_key.public_key().public_bytes( + serialization.Encoding.OpenSSH, + serialization.PublicFormat.OpenSSH, + ).decode("utf-8") + pub_parts = pub_ssh.split(" ", 2) + pub_key_data = pub_parts[1] if len(pub_parts) >= 2 else pub_ssh + + service = get_session_vyos_service(http_request) + version = service.get_version() + + # Try OpenSSH format first (matches VyOS's own generate output) + for fmt_name, priv_pem in [("OpenSSH", priv_openssh), ("PKCS8", priv_pkcs8)]: + builder = PKIBatchBuilder(version=version) + builder.create_openssh(request.name) + builder.set_openssh_public_type(request.name, "ssh-rsa") + builder.set_openssh_public_key(request.name, pub_key_data) + builder.set_openssh_private_key(request.name, priv_pem) + + response = service.execute_batch(builder) + + if response.status == 200: + return VyOSResponse( + success=True, + data={"message": f"OpenSSH key '{request.name}' generated and installed (format: {fmt_name})"}, + ) + + # If first format failed, delete the partial node before retrying + if fmt_name == "OpenSSH": + cleanup = PKIBatchBuilder(version=version) + cleanup.delete_openssh(request.name) + service.execute_batch(cleanup) + + # Both formats failed + return VyOSResponse( + success=False, + error=response.error or "Failed to install OpenSSH key — both OpenSSH and PKCS8 formats were rejected by VyOS", + ) + + return VyOSResponse( + success=response.status == 200, + data={"message": f"OpenSSH key '{request.name}' generated and installed"}, + error=response.error if response.error else None, + ) + except HTTPException: + raise + except Exception: + logger.exception("Unhandled error generating OpenSSH key") + raise HTTPException(status_code=500, detail="Internal server error") diff --git a/backend/vyos_builders/__init__.py b/backend/vyos_builders/__init__.py index 1326f83..df4d96e 100644 --- a/backend/vyos_builders/__init__.py +++ b/backend/vyos_builders/__init__.py @@ -35,6 +35,7 @@ from .load_balancing import LoadBalancingBatchBuilder from .isis import IsisBatchBuilder from .ipsec import IPSecBatchBuilder +from .pki import PKIBatchBuilder # Directly use the self-contained builders EthernetBatchBuilder = EthernetInterfaceBuilderMixin @@ -77,4 +78,5 @@ "LoadBalancingBatchBuilder", "IsisBatchBuilder", "IPSecBatchBuilder", + "PKIBatchBuilder", ] diff --git a/backend/vyos_builders/pki/__init__.py b/backend/vyos_builders/pki/__init__.py new file mode 100644 index 0000000..4a4c537 --- /dev/null +++ b/backend/vyos_builders/pki/__init__.py @@ -0,0 +1,5 @@ +"""PKI Batch Builder package.""" + +from .pki import PKIBatchBuilder + +__all__ = ["PKIBatchBuilder"] diff --git a/backend/vyos_builders/pki/pki.py b/backend/vyos_builders/pki/pki.py new file mode 100644 index 0000000..166c262 --- /dev/null +++ b/backend/vyos_builders/pki/pki.py @@ -0,0 +1,387 @@ +""" +PKI (Public Key Infrastructure) Batch Builder + +Provides all batch operations for PKI configuration including: +- Certificate Authorities (CA) +- Certificates (including ACME) +- Diffie-Hellman parameters +- Key Pairs +- OpenSSH keys +- OpenVPN shared secrets +- X509 defaults +""" + +from typing import List, Dict, Any +from vyos_mappers import CommandMapperRegistry + + +class PKIBatchBuilder: + """Complete batch builder for PKI operations.""" + + def __init__(self, version: str): + self.version = version + self._operations: List[Dict[str, Any]] = [] + self.mappers = CommandMapperRegistry.get_all_mappers(version) + self.mapper_key = "pki" + + # ======================================================================== + # Core Batch Operations + # ======================================================================== + + def add_set(self, path: List[str]) -> "PKIBatchBuilder": + if path: + self._operations.append({"op": "set", "path": path}) + return self + + def add_delete(self, path: List[str]) -> "PKIBatchBuilder": + if path: + self._operations.append({"op": "delete", "path": path}) + return self + + def clear(self) -> None: + self._operations = [] + + def get_operations(self) -> List[Dict[str, Any]]: + return self._operations.copy() + + def operation_count(self) -> int: + return len(self._operations) + + def is_empty(self) -> bool: + return len(self._operations) == 0 + + # ======================================================================== + # Certificate Authority (CA) + # ======================================================================== + + def create_ca(self, name: str) -> "PKIBatchBuilder": + return self.add_set(self.mappers[self.mapper_key].get_ca_path(name)) + + def delete_ca(self, name: str) -> "PKIBatchBuilder": + return self.add_delete(self.mappers[self.mapper_key].get_ca_delete_path(name)) + + def set_ca_certificate(self, name: str, cert: str) -> "PKIBatchBuilder": + return self.add_set(self.mappers[self.mapper_key].get_ca_certificate_path(name, cert)) + + def delete_ca_certificate(self, name: str) -> "PKIBatchBuilder": + return self.add_delete(self.mappers[self.mapper_key].get_ca_certificate_delete_path(name)) + + def set_ca_crl(self, name: str, crl: str) -> "PKIBatchBuilder": + return self.add_set(self.mappers[self.mapper_key].get_ca_crl_path(name, crl)) + + def delete_ca_crl(self, name: str, crl: str) -> "PKIBatchBuilder": + return self.add_delete(self.mappers[self.mapper_key].get_ca_crl_delete_path(name, crl)) + + def delete_ca_crl_all(self, name: str) -> "PKIBatchBuilder": + return self.add_delete(self.mappers[self.mapper_key].get_ca_crl_delete_all_path(name)) + + def set_ca_description(self, name: str, description: str) -> "PKIBatchBuilder": + return self.add_set(self.mappers[self.mapper_key].get_ca_description_path(name, description)) + + def delete_ca_description(self, name: str) -> "PKIBatchBuilder": + return self.add_delete(self.mappers[self.mapper_key].get_ca_description_delete_path(name)) + + def set_ca_private_key(self, name: str, key: str) -> "PKIBatchBuilder": + return self.add_set(self.mappers[self.mapper_key].get_ca_private_key_path(name, key)) + + def delete_ca_private_key(self, name: str) -> "PKIBatchBuilder": + return self.add_delete(self.mappers[self.mapper_key].get_ca_private_key_delete_path(name)) + + def set_ca_private_password_protected(self, name: str) -> "PKIBatchBuilder": + return self.add_set(self.mappers[self.mapper_key].get_ca_private_password_protected_path(name)) + + def delete_ca_private_password_protected(self, name: str) -> "PKIBatchBuilder": + return self.add_delete(self.mappers[self.mapper_key].get_ca_private_password_protected_path(name)) + + def set_ca_revoke(self, name: str) -> "PKIBatchBuilder": + return self.add_set(self.mappers[self.mapper_key].get_ca_revoke_path(name)) + + def delete_ca_revoke(self, name: str) -> "PKIBatchBuilder": + return self.add_delete(self.mappers[self.mapper_key].get_ca_revoke_path(name)) + + def set_ca_system_install(self, name: str) -> "PKIBatchBuilder": + return self.add_set(self.mappers[self.mapper_key].get_ca_system_install_path(name)) + + def delete_ca_system_install(self, name: str) -> "PKIBatchBuilder": + return self.add_delete(self.mappers[self.mapper_key].get_ca_system_install_path(name)) + + # ======================================================================== + # Certificate + # ======================================================================== + + def create_certificate(self, name: str) -> "PKIBatchBuilder": + return self.add_set(self.mappers[self.mapper_key].get_certificate_path(name)) + + def delete_certificate(self, name: str) -> "PKIBatchBuilder": + return self.add_delete(self.mappers[self.mapper_key].get_certificate_delete_path(name)) + + def set_certificate_cert(self, name: str, cert: str) -> "PKIBatchBuilder": + return self.add_set(self.mappers[self.mapper_key].get_certificate_cert_path(name, cert)) + + def delete_certificate_cert(self, name: str) -> "PKIBatchBuilder": + return self.add_delete(self.mappers[self.mapper_key].get_certificate_cert_delete_path(name)) + + def set_certificate_description(self, name: str, description: str) -> "PKIBatchBuilder": + return self.add_set(self.mappers[self.mapper_key].get_certificate_description_path(name, description)) + + def delete_certificate_description(self, name: str) -> "PKIBatchBuilder": + return self.add_delete(self.mappers[self.mapper_key].get_certificate_description_delete_path(name)) + + def set_certificate_private_key(self, name: str, key: str) -> "PKIBatchBuilder": + return self.add_set(self.mappers[self.mapper_key].get_certificate_private_key_path(name, key)) + + def delete_certificate_private_key(self, name: str) -> "PKIBatchBuilder": + return self.add_delete(self.mappers[self.mapper_key].get_certificate_private_key_delete_path(name)) + + def set_certificate_private_password_protected(self, name: str) -> "PKIBatchBuilder": + return self.add_set(self.mappers[self.mapper_key].get_certificate_private_password_protected_path(name)) + + def delete_certificate_private_password_protected(self, name: str) -> "PKIBatchBuilder": + return self.add_delete(self.mappers[self.mapper_key].get_certificate_private_password_protected_path(name)) + + def set_certificate_revoke(self, name: str) -> "PKIBatchBuilder": + return self.add_set(self.mappers[self.mapper_key].get_certificate_revoke_path(name)) + + def delete_certificate_revoke(self, name: str) -> "PKIBatchBuilder": + return self.add_delete(self.mappers[self.mapper_key].get_certificate_revoke_path(name)) + + # ACME + def set_certificate_acme(self, name: str) -> "PKIBatchBuilder": + return self.add_set(self.mappers[self.mapper_key].get_certificate_acme_path(name)) + + def delete_certificate_acme(self, name: str) -> "PKIBatchBuilder": + return self.add_delete(self.mappers[self.mapper_key].get_certificate_acme_delete_path(name)) + + def set_certificate_acme_domain_name(self, name: str, domain: str) -> "PKIBatchBuilder": + return self.add_set(self.mappers[self.mapper_key].get_certificate_acme_domain_name_path(name, domain)) + + def delete_certificate_acme_domain_name(self, name: str, domain: str) -> "PKIBatchBuilder": + return self.add_delete(self.mappers[self.mapper_key].get_certificate_acme_domain_name_delete_path(name, domain)) + + def delete_certificate_acme_domain_name_all(self, name: str) -> "PKIBatchBuilder": + return self.add_delete(self.mappers[self.mapper_key].get_certificate_acme_domain_name_delete_all_path(name)) + + def set_certificate_acme_email(self, name: str, email: str) -> "PKIBatchBuilder": + return self.add_set(self.mappers[self.mapper_key].get_certificate_acme_email_path(name, email)) + + def delete_certificate_acme_email(self, name: str) -> "PKIBatchBuilder": + return self.add_delete(self.mappers[self.mapper_key].get_certificate_acme_email_delete_path(name)) + + def set_certificate_acme_listen_address(self, name: str, address: str) -> "PKIBatchBuilder": + return self.add_set(self.mappers[self.mapper_key].get_certificate_acme_listen_address_path(name, address)) + + def delete_certificate_acme_listen_address(self, name: str) -> "PKIBatchBuilder": + return self.add_delete(self.mappers[self.mapper_key].get_certificate_acme_listen_address_delete_path(name)) + + def set_certificate_acme_rsa_key_size(self, name: str, size: str) -> "PKIBatchBuilder": + return self.add_set(self.mappers[self.mapper_key].get_certificate_acme_rsa_key_size_path(name, size)) + + def delete_certificate_acme_rsa_key_size(self, name: str) -> "PKIBatchBuilder": + return self.add_delete(self.mappers[self.mapper_key].get_certificate_acme_rsa_key_size_delete_path(name)) + + def set_certificate_acme_url(self, name: str, url: str) -> "PKIBatchBuilder": + return self.add_set(self.mappers[self.mapper_key].get_certificate_acme_url_path(name, url)) + + def delete_certificate_acme_url(self, name: str) -> "PKIBatchBuilder": + return self.add_delete(self.mappers[self.mapper_key].get_certificate_acme_url_delete_path(name)) + + # ======================================================================== + # Diffie-Hellman (DH) + # ======================================================================== + + def create_dh(self, name: str) -> "PKIBatchBuilder": + return self.add_set(self.mappers[self.mapper_key].get_dh_path(name)) + + def delete_dh(self, name: str) -> "PKIBatchBuilder": + return self.add_delete(self.mappers[self.mapper_key].get_dh_delete_path(name)) + + def set_dh_parameters(self, name: str, parameters: str) -> "PKIBatchBuilder": + return self.add_set(self.mappers[self.mapper_key].get_dh_parameters_path(name, parameters)) + + def delete_dh_parameters(self, name: str) -> "PKIBatchBuilder": + return self.add_delete(self.mappers[self.mapper_key].get_dh_parameters_delete_path(name)) + + # ======================================================================== + # Key Pair + # ======================================================================== + + def create_key_pair(self, name: str) -> "PKIBatchBuilder": + return self.add_set(self.mappers[self.mapper_key].get_key_pair_path(name)) + + def delete_key_pair(self, name: str) -> "PKIBatchBuilder": + return self.add_delete(self.mappers[self.mapper_key].get_key_pair_delete_path(name)) + + def set_key_pair_private_key(self, name: str, key: str) -> "PKIBatchBuilder": + return self.add_set(self.mappers[self.mapper_key].get_key_pair_private_key_path(name, key)) + + def delete_key_pair_private_key(self, name: str) -> "PKIBatchBuilder": + return self.add_delete(self.mappers[self.mapper_key].get_key_pair_private_key_delete_path(name)) + + def set_key_pair_private_password_protected(self, name: str) -> "PKIBatchBuilder": + return self.add_set(self.mappers[self.mapper_key].get_key_pair_private_password_protected_path(name)) + + def delete_key_pair_private_password_protected(self, name: str) -> "PKIBatchBuilder": + return self.add_delete(self.mappers[self.mapper_key].get_key_pair_private_password_protected_path(name)) + + def set_key_pair_public_key(self, name: str, key: str) -> "PKIBatchBuilder": + return self.add_set(self.mappers[self.mapper_key].get_key_pair_public_key_path(name, key)) + + def delete_key_pair_public_key(self, name: str) -> "PKIBatchBuilder": + return self.add_delete(self.mappers[self.mapper_key].get_key_pair_public_key_delete_path(name)) + + # ======================================================================== + # OpenSSH + # ======================================================================== + + def create_openssh(self, name: str) -> "PKIBatchBuilder": + return self.add_set(self.mappers[self.mapper_key].get_openssh_path(name)) + + def delete_openssh(self, name: str) -> "PKIBatchBuilder": + return self.add_delete(self.mappers[self.mapper_key].get_openssh_delete_path(name)) + + def set_openssh_private_key(self, name: str, key: str) -> "PKIBatchBuilder": + return self.add_set(self.mappers[self.mapper_key].get_openssh_private_key_path(name, key)) + + def delete_openssh_private_key(self, name: str) -> "PKIBatchBuilder": + return self.add_delete(self.mappers[self.mapper_key].get_openssh_private_key_delete_path(name)) + + def set_openssh_private_password_protected(self, name: str) -> "PKIBatchBuilder": + return self.add_set(self.mappers[self.mapper_key].get_openssh_private_password_protected_path(name)) + + def delete_openssh_private_password_protected(self, name: str) -> "PKIBatchBuilder": + return self.add_delete(self.mappers[self.mapper_key].get_openssh_private_password_protected_path(name)) + + def set_openssh_public_key(self, name: str, key: str) -> "PKIBatchBuilder": + return self.add_set(self.mappers[self.mapper_key].get_openssh_public_key_path(name, key)) + + def delete_openssh_public_key(self, name: str) -> "PKIBatchBuilder": + return self.add_delete(self.mappers[self.mapper_key].get_openssh_public_key_delete_path(name)) + + def set_openssh_public_type(self, name: str, key_type: str) -> "PKIBatchBuilder": + return self.add_set(self.mappers[self.mapper_key].get_openssh_public_type_path(name, key_type)) + + def delete_openssh_public_type(self, name: str) -> "PKIBatchBuilder": + return self.add_delete(self.mappers[self.mapper_key].get_openssh_public_type_delete_path(name)) + + # ======================================================================== + # OpenVPN Shared Secret + # ======================================================================== + + def create_openvpn_shared_secret(self, name: str) -> "PKIBatchBuilder": + return self.add_set(self.mappers[self.mapper_key].get_openvpn_shared_secret_path(name)) + + def delete_openvpn_shared_secret(self, name: str) -> "PKIBatchBuilder": + return self.add_delete(self.mappers[self.mapper_key].get_openvpn_shared_secret_delete_path(name)) + + def set_openvpn_shared_secret_key(self, name: str, key: str) -> "PKIBatchBuilder": + return self.add_set(self.mappers[self.mapper_key].get_openvpn_shared_secret_key_path(name, key)) + + def delete_openvpn_shared_secret_key(self, name: str) -> "PKIBatchBuilder": + return self.add_delete(self.mappers[self.mapper_key].get_openvpn_shared_secret_key_delete_path(name)) + + def set_openvpn_shared_secret_version(self, name: str, version: str) -> "PKIBatchBuilder": + return self.add_set(self.mappers[self.mapper_key].get_openvpn_shared_secret_version_path(name, version)) + + def delete_openvpn_shared_secret_version(self, name: str) -> "PKIBatchBuilder": + return self.add_delete(self.mappers[self.mapper_key].get_openvpn_shared_secret_version_delete_path(name)) + + # ======================================================================== + # X509 Defaults + # ======================================================================== + + def set_x509_default_country(self, _unused: str, country: str) -> "PKIBatchBuilder": + return self.add_set(self.mappers[self.mapper_key].get_x509_default_country_path(country)) + + def delete_x509_default_country(self, _unused: str) -> "PKIBatchBuilder": + return self.add_delete(self.mappers[self.mapper_key].get_x509_default_country_delete_path()) + + def set_x509_default_locality(self, _unused: str, locality: str) -> "PKIBatchBuilder": + return self.add_set(self.mappers[self.mapper_key].get_x509_default_locality_path(locality)) + + def delete_x509_default_locality(self, _unused: str) -> "PKIBatchBuilder": + return self.add_delete(self.mappers[self.mapper_key].get_x509_default_locality_delete_path()) + + def set_x509_default_organization(self, _unused: str, organization: str) -> "PKIBatchBuilder": + return self.add_set(self.mappers[self.mapper_key].get_x509_default_organization_path(organization)) + + def delete_x509_default_organization(self, _unused: str) -> "PKIBatchBuilder": + return self.add_delete(self.mappers[self.mapper_key].get_x509_default_organization_delete_path()) + + def set_x509_default_state(self, _unused: str, state: str) -> "PKIBatchBuilder": + return self.add_set(self.mappers[self.mapper_key].get_x509_default_state_path(state)) + + def delete_x509_default_state(self, _unused: str) -> "PKIBatchBuilder": + return self.add_delete(self.mappers[self.mapper_key].get_x509_default_state_delete_path()) + + def delete_x509_defaults(self, _unused: str) -> "PKIBatchBuilder": + return self.add_delete(self.mappers[self.mapper_key].get_x509_default_delete_path()) + + # ======================================================================== + # Delete entire PKI + # ======================================================================== + + def delete_pki(self, _unused: str) -> "PKIBatchBuilder": + return self.add_delete(self.mappers[self.mapper_key].get_delete_all_path()) + + # ======================================================================== + # Capabilities + # ======================================================================== + + def get_capabilities(self) -> Dict[str, Any]: + is_v14 = "1.4" in self.version + is_v15 = "1.5" in self.version or "latest" in self.version + + return { + "version": self.version, + "version_info": { + "is_1_4": is_v14, + "is_1_5": is_v15, + }, + "features": { + "ca": { + "supported": True, + "description": "Certificate Authority management", + "settings": ["certificate", "crl", "description", "private_key", + "password_protected", "revoke", "system_install"], + }, + "certificate": { + "supported": True, + "description": "Certificate management", + "settings": ["certificate", "description", "private_key", + "password_protected", "revoke"], + }, + "acme": { + "supported": True, + "description": "ACME certificate management (Let's Encrypt)", + "settings": ["domain_name", "email", "listen_address", + "rsa_key_size", "url"], + "rsa_key_sizes": ["2048", "3072", "4096"], + "listen_address_ipv6": is_v15, + }, + "dh": { + "supported": True, + "description": "Diffie-Hellman parameters", + }, + "key_pair": { + "supported": True, + "description": "Public and private key pairs", + "settings": ["private_key", "password_protected", "public_key"], + }, + "openssh": { + "supported": True, + "description": "OpenSSH key management", + "settings": ["private_key", "password_protected", "public_key", "public_type"], + "public_types": ["ssh-rsa"], + }, + "openvpn_shared_secret": { + "supported": True, + "description": "OpenVPN shared secret keys", + "settings": ["key", "version"], + }, + "x509_defaults": { + "supported": True, + "description": "X509 default values for certificate generation", + "settings": ["country", "locality", "organization", "state"], + }, + }, + } diff --git a/backend/vyos_mappers/__init__.py b/backend/vyos_mappers/__init__.py index 79da4bd..4722623 100644 --- a/backend/vyos_mappers/__init__.py +++ b/backend/vyos_mappers/__init__.py @@ -70,6 +70,8 @@ from .ipsec.ipsec_versions import get_ipsec_mapper from .l2tp import L2TPMapper from .l2tp.l2tp_versions import get_l2tp_mapper +from .pki import PKIMapper +from .pki.pki_versions import get_pki_mapper # Auto-register all mappers # Ethernet uses factory for version-specific mappers @@ -156,6 +158,8 @@ CommandMapperRegistry.register_feature("ipsec", get_ipsec_mapper) # L2TP uses factory for version-specific mappers CommandMapperRegistry.register_feature("l2tp", get_l2tp_mapper) +# PKI uses factory for version-specific mappers +CommandMapperRegistry.register_feature("pki", get_pki_mapper) __all__ = [ "BaseFeatureMapper", @@ -204,4 +208,5 @@ "MplsMapper", "IPSecMapper", "L2TPMapper", + "PKIMapper", ] diff --git a/backend/vyos_mappers/pki/__init__.py b/backend/vyos_mappers/pki/__init__.py new file mode 100644 index 0000000..e629c0d --- /dev/null +++ b/backend/vyos_mappers/pki/__init__.py @@ -0,0 +1,5 @@ +"""PKI Command Mapper package.""" + +from .pki import PKIMapper + +__all__ = ["PKIMapper"] diff --git a/backend/vyos_mappers/pki/pki.py b/backend/vyos_mappers/pki/pki.py new file mode 100644 index 0000000..267a725 --- /dev/null +++ b/backend/vyos_mappers/pki/pki.py @@ -0,0 +1,440 @@ +""" +PKI (Public Key Infrastructure) Command Mapper + +Handles command path generation and config parsing for PKI. +The PKI command tree is nearly identical between VyOS 1.4 and 1.5. +Only difference: VyOS 1.5 ACME listen-address supports IPv6. +""" + +from typing import List, Dict, Any, Optional + + +class PKIMapper: + """Base mapper with all PKI operations.""" + + def __init__(self, version: str): + self.version = version + + # ======================================================================== + # Base path + # ======================================================================== + + def get_base_path(self) -> List[str]: + return ["pki"] + + # ======================================================================== + # Certificate Authority (CA) + # ======================================================================== + + def get_ca_path(self, name: str) -> List[str]: + return ["pki", "ca", name] + + def get_ca_delete_path(self, name: str) -> List[str]: + return ["pki", "ca", name] + + def get_ca_certificate_path(self, name: str, cert: str) -> List[str]: + return ["pki", "ca", name, "certificate", cert] + + def get_ca_certificate_delete_path(self, name: str) -> List[str]: + return ["pki", "ca", name, "certificate"] + + def get_ca_crl_path(self, name: str, crl: str) -> List[str]: + return ["pki", "ca", name, "crl", crl] + + def get_ca_crl_delete_path(self, name: str, crl: str) -> List[str]: + return ["pki", "ca", name, "crl", crl] + + def get_ca_crl_delete_all_path(self, name: str) -> List[str]: + return ["pki", "ca", name, "crl"] + + def get_ca_description_path(self, name: str, description: str) -> List[str]: + return ["pki", "ca", name, "description", description] + + def get_ca_description_delete_path(self, name: str) -> List[str]: + return ["pki", "ca", name, "description"] + + def get_ca_private_key_path(self, name: str, key: str) -> List[str]: + return ["pki", "ca", name, "private", "key", key] + + def get_ca_private_key_delete_path(self, name: str) -> List[str]: + return ["pki", "ca", name, "private", "key"] + + def get_ca_private_password_protected_path(self, name: str) -> List[str]: + return ["pki", "ca", name, "private", "password-protected"] + + def get_ca_revoke_path(self, name: str) -> List[str]: + return ["pki", "ca", name, "revoke"] + + def get_ca_system_install_path(self, name: str) -> List[str]: + return ["pki", "ca", name, "system-install"] + + # ======================================================================== + # Certificate + # ======================================================================== + + def get_certificate_path(self, name: str) -> List[str]: + return ["pki", "certificate", name] + + def get_certificate_delete_path(self, name: str) -> List[str]: + return ["pki", "certificate", name] + + def get_certificate_cert_path(self, name: str, cert: str) -> List[str]: + return ["pki", "certificate", name, "certificate", cert] + + def get_certificate_cert_delete_path(self, name: str) -> List[str]: + return ["pki", "certificate", name, "certificate"] + + def get_certificate_description_path(self, name: str, description: str) -> List[str]: + return ["pki", "certificate", name, "description", description] + + def get_certificate_description_delete_path(self, name: str) -> List[str]: + return ["pki", "certificate", name, "description"] + + def get_certificate_private_key_path(self, name: str, key: str) -> List[str]: + return ["pki", "certificate", name, "private", "key", key] + + def get_certificate_private_key_delete_path(self, name: str) -> List[str]: + return ["pki", "certificate", name, "private", "key"] + + def get_certificate_private_password_protected_path(self, name: str) -> List[str]: + return ["pki", "certificate", name, "private", "password-protected"] + + def get_certificate_revoke_path(self, name: str) -> List[str]: + return ["pki", "certificate", name, "revoke"] + + # ACME + def get_certificate_acme_path(self, name: str) -> List[str]: + return ["pki", "certificate", name, "acme"] + + def get_certificate_acme_domain_name_path(self, name: str, domain: str) -> List[str]: + return ["pki", "certificate", name, "acme", "domain-name", domain] + + def get_certificate_acme_domain_name_delete_path(self, name: str, domain: str) -> List[str]: + return ["pki", "certificate", name, "acme", "domain-name", domain] + + def get_certificate_acme_domain_name_delete_all_path(self, name: str) -> List[str]: + return ["pki", "certificate", name, "acme", "domain-name"] + + def get_certificate_acme_email_path(self, name: str, email: str) -> List[str]: + return ["pki", "certificate", name, "acme", "email", email] + + def get_certificate_acme_email_delete_path(self, name: str) -> List[str]: + return ["pki", "certificate", name, "acme", "email"] + + def get_certificate_acme_listen_address_path(self, name: str, address: str) -> List[str]: + return ["pki", "certificate", name, "acme", "listen-address", address] + + def get_certificate_acme_listen_address_delete_path(self, name: str) -> List[str]: + return ["pki", "certificate", name, "acme", "listen-address"] + + def get_certificate_acme_rsa_key_size_path(self, name: str, size: str) -> List[str]: + return ["pki", "certificate", name, "acme", "rsa-key-size", size] + + def get_certificate_acme_rsa_key_size_delete_path(self, name: str) -> List[str]: + return ["pki", "certificate", name, "acme", "rsa-key-size"] + + def get_certificate_acme_url_path(self, name: str, url: str) -> List[str]: + return ["pki", "certificate", name, "acme", "url", url] + + def get_certificate_acme_url_delete_path(self, name: str) -> List[str]: + return ["pki", "certificate", name, "acme", "url"] + + def get_certificate_acme_delete_path(self, name: str) -> List[str]: + return ["pki", "certificate", name, "acme"] + + # ======================================================================== + # Diffie-Hellman (DH) + # ======================================================================== + + def get_dh_path(self, name: str) -> List[str]: + return ["pki", "dh", name] + + def get_dh_delete_path(self, name: str) -> List[str]: + return ["pki", "dh", name] + + def get_dh_parameters_path(self, name: str, parameters: str) -> List[str]: + return ["pki", "dh", name, "parameters", parameters] + + def get_dh_parameters_delete_path(self, name: str) -> List[str]: + return ["pki", "dh", name, "parameters"] + + # ======================================================================== + # Key Pair + # ======================================================================== + + def get_key_pair_path(self, name: str) -> List[str]: + return ["pki", "key-pair", name] + + def get_key_pair_delete_path(self, name: str) -> List[str]: + return ["pki", "key-pair", name] + + def get_key_pair_private_key_path(self, name: str, key: str) -> List[str]: + return ["pki", "key-pair", name, "private", "key", key] + + def get_key_pair_private_key_delete_path(self, name: str) -> List[str]: + return ["pki", "key-pair", name, "private", "key"] + + def get_key_pair_private_password_protected_path(self, name: str) -> List[str]: + return ["pki", "key-pair", name, "private", "password-protected"] + + def get_key_pair_public_key_path(self, name: str, key: str) -> List[str]: + return ["pki", "key-pair", name, "public", "key", key] + + def get_key_pair_public_key_delete_path(self, name: str) -> List[str]: + return ["pki", "key-pair", name, "public", "key"] + + # ======================================================================== + # OpenSSH + # ======================================================================== + + def get_openssh_path(self, name: str) -> List[str]: + return ["pki", "openssh", name] + + def get_openssh_delete_path(self, name: str) -> List[str]: + return ["pki", "openssh", name] + + def get_openssh_private_key_path(self, name: str, key: str) -> List[str]: + return ["pki", "openssh", name, "private", "key", key] + + def get_openssh_private_key_delete_path(self, name: str) -> List[str]: + return ["pki", "openssh", name, "private", "key"] + + def get_openssh_private_password_protected_path(self, name: str) -> List[str]: + return ["pki", "openssh", name, "private", "password-protected"] + + def get_openssh_public_key_path(self, name: str, key: str) -> List[str]: + return ["pki", "openssh", name, "public", "key", key] + + def get_openssh_public_key_delete_path(self, name: str) -> List[str]: + return ["pki", "openssh", name, "public", "key"] + + def get_openssh_public_type_path(self, name: str, key_type: str) -> List[str]: + return ["pki", "openssh", name, "public", "type", key_type] + + def get_openssh_public_type_delete_path(self, name: str) -> List[str]: + return ["pki", "openssh", name, "public", "type"] + + # ======================================================================== + # OpenVPN Shared Secret + # ======================================================================== + + def get_openvpn_shared_secret_path(self, name: str) -> List[str]: + return ["pki", "openvpn", "shared-secret", name] + + def get_openvpn_shared_secret_delete_path(self, name: str) -> List[str]: + return ["pki", "openvpn", "shared-secret", name] + + def get_openvpn_shared_secret_key_path(self, name: str, key: str) -> List[str]: + return ["pki", "openvpn", "shared-secret", name, "key", key] + + def get_openvpn_shared_secret_key_delete_path(self, name: str) -> List[str]: + return ["pki", "openvpn", "shared-secret", name, "key"] + + def get_openvpn_shared_secret_version_path(self, name: str, version: str) -> List[str]: + return ["pki", "openvpn", "shared-secret", name, "version", version] + + def get_openvpn_shared_secret_version_delete_path(self, name: str) -> List[str]: + return ["pki", "openvpn", "shared-secret", name, "version"] + + # ======================================================================== + # X509 Defaults + # ======================================================================== + + def get_x509_default_country_path(self, country: str) -> List[str]: + return ["pki", "x509", "default", "country", country] + + def get_x509_default_country_delete_path(self) -> List[str]: + return ["pki", "x509", "default", "country"] + + def get_x509_default_locality_path(self, locality: str) -> List[str]: + return ["pki", "x509", "default", "locality", locality] + + def get_x509_default_locality_delete_path(self) -> List[str]: + return ["pki", "x509", "default", "locality"] + + def get_x509_default_organization_path(self, organization: str) -> List[str]: + return ["pki", "x509", "default", "organization", organization] + + def get_x509_default_organization_delete_path(self) -> List[str]: + return ["pki", "x509", "default", "organization"] + + def get_x509_default_state_path(self, state: str) -> List[str]: + return ["pki", "x509", "default", "state", state] + + def get_x509_default_state_delete_path(self) -> List[str]: + return ["pki", "x509", "default", "state"] + + def get_x509_default_delete_path(self) -> List[str]: + return ["pki", "x509", "default"] + + # ======================================================================== + # Delete all PKI + # ======================================================================== + + def get_delete_all_path(self) -> List[str]: + return ["pki"] + + # ======================================================================== + # Config Parsing + # ======================================================================== + + def parse_config(self, full_config: Dict[str, Any]) -> Dict[str, Any]: + """Parse full VyOS config and extract PKI configuration.""" + pki_config = full_config.get("pki", {}) + + if not pki_config: + return { + "configured": False, + "ca": {}, + "certificates": {}, + "dh": {}, + "key_pairs": {}, + "openssh": {}, + "openvpn_shared_secrets": {}, + "x509_defaults": {}, + } + + return { + "configured": True, + "ca": self._parse_ca(pki_config.get("ca", {})), + "certificates": self._parse_certificates(pki_config.get("certificate", {})), + "dh": self._parse_dh(pki_config.get("dh", {})), + "key_pairs": self._parse_key_pairs(pki_config.get("key-pair", {})), + "openssh": self._parse_openssh(pki_config.get("openssh", {})), + "openvpn_shared_secrets": self._parse_openvpn(pki_config.get("openvpn", {}).get("shared-secret", {})), + "x509_defaults": self._parse_x509_defaults(pki_config.get("x509", {}).get("default", {})), + } + + def _parse_ca(self, ca_config: Dict[str, Any]) -> Dict[str, Any]: + """Parse CA entries.""" + result = {} + for name, data in ca_config.items(): + if not isinstance(data, dict): + continue + private = data.get("private", {}) + crl_data = data.get("crl", {}) + # CRL is multi-value - can be a list or a single string + if isinstance(crl_data, str): + crl_list = [crl_data] + elif isinstance(crl_data, list): + crl_list = crl_data + elif isinstance(crl_data, dict): + crl_list = list(crl_data.keys()) if crl_data else [] + else: + crl_list = [] + + result[name] = { + "name": name, + "certificate": data.get("certificate"), + "crl": crl_list, + "description": data.get("description"), + "private_key": private.get("key") if isinstance(private, dict) else None, + "password_protected": "password-protected" in private if isinstance(private, dict) else False, + "revoke": "revoke" in data, + "system_install": "system-install" in data, + } + return result + + def _parse_certificates(self, cert_config: Dict[str, Any]) -> Dict[str, Any]: + """Parse certificate entries.""" + result = {} + for name, data in cert_config.items(): + if not isinstance(data, dict): + continue + private = data.get("private", {}) + acme = data.get("acme", {}) + + # ACME domain-name is multi-value + acme_domains = acme.get("domain-name", []) if isinstance(acme, dict) else [] + if isinstance(acme_domains, str): + acme_domains = [acme_domains] + + acme_config = None + if isinstance(acme, dict) and acme: + acme_config = { + "domain_names": acme_domains, + "email": acme.get("email"), + "listen_address": acme.get("listen-address"), + "rsa_key_size": acme.get("rsa-key-size"), + "url": acme.get("url"), + } + + result[name] = { + "name": name, + "certificate": data.get("certificate"), + "description": data.get("description"), + "private_key": private.get("key") if isinstance(private, dict) else None, + "password_protected": "password-protected" in private if isinstance(private, dict) else False, + "revoke": "revoke" in data, + "acme": acme_config, + } + return result + + def _parse_dh(self, dh_config: Dict[str, Any]) -> Dict[str, Any]: + """Parse DH parameter entries.""" + result = {} + for name, data in dh_config.items(): + if not isinstance(data, dict): + continue + result[name] = { + "name": name, + "parameters": data.get("parameters"), + } + return result + + def _parse_key_pairs(self, kp_config: Dict[str, Any]) -> Dict[str, Any]: + """Parse key pair entries.""" + result = {} + for name, data in kp_config.items(): + if not isinstance(data, dict): + continue + private = data.get("private", {}) + public = data.get("public", {}) + result[name] = { + "name": name, + "private_key": private.get("key") if isinstance(private, dict) else None, + "password_protected": "password-protected" in private if isinstance(private, dict) else False, + "public_key": public.get("key") if isinstance(public, dict) else None, + } + return result + + def _parse_openssh(self, openssh_config: Dict[str, Any]) -> Dict[str, Any]: + """Parse OpenSSH key entries.""" + result = {} + for name, data in openssh_config.items(): + if not isinstance(data, dict): + continue + private = data.get("private", {}) + public = data.get("public", {}) + result[name] = { + "name": name, + "private_key": private.get("key") if isinstance(private, dict) else None, + "password_protected": "password-protected" in private if isinstance(private, dict) else False, + "public_key": public.get("key") if isinstance(public, dict) else None, + "public_type": public.get("type") if isinstance(public, dict) else None, + } + return result + + def _parse_openvpn(self, openvpn_config: Dict[str, Any]) -> Dict[str, Any]: + """Parse OpenVPN shared secret entries.""" + result = {} + for name, data in openvpn_config.items(): + if not isinstance(data, dict): + continue + result[name] = { + "name": name, + "key": data.get("key"), + "version": data.get("version"), + } + return result + + def _parse_x509_defaults(self, x509_config: Dict[str, Any]) -> Dict[str, Any]: + """Parse X509 default values.""" + if not x509_config: + return {} + return { + "country": x509_config.get("country"), + "locality": x509_config.get("locality"), + "organization": x509_config.get("organization"), + "state": x509_config.get("state"), + } diff --git a/backend/vyos_mappers/pki/pki_versions/__init__.py b/backend/vyos_mappers/pki/pki_versions/__init__.py new file mode 100644 index 0000000..cca6aea --- /dev/null +++ b/backend/vyos_mappers/pki/pki_versions/__init__.py @@ -0,0 +1,36 @@ +"""Factory for version-specific PKI mappers.""" + +from ..pki import PKIMapper +from .v1_4 import PKIMapperV1_4 +from .v1_5 import PKIMapperV1_5 + + +def get_pki_mapper(version: str) -> PKIMapper: + """ + Factory function to get appropriate mapper for version. + + The PKI command tree is nearly identical between VyOS 1.4 and 1.5. + Only difference: VyOS 1.5 ACME listen-address supports IPv6. + + Args: + version: VyOS version string (e.g., "1.4", "1.5") + + Returns: + Merged mapper with base and version-specific methods + """ + base = PKIMapper(version) + + if "1.4" in version: + version_specific = PKIMapperV1_4() + elif "1.5" in version or "latest" in version: + version_specific = PKIMapperV1_5() + else: + version_specific = PKIMapperV1_5() # Default to latest + + class MergedMapper: + def __getattr__(self, name): + if hasattr(version_specific, name): + return getattr(version_specific, name) + return getattr(base, name) + + return MergedMapper() diff --git a/backend/vyos_mappers/pki/pki_versions/v1_4.py b/backend/vyos_mappers/pki/pki_versions/v1_4.py new file mode 100644 index 0000000..a79200b --- /dev/null +++ b/backend/vyos_mappers/pki/pki_versions/v1_4.py @@ -0,0 +1,10 @@ +"""VyOS 1.4 specific PKI mapper overrides.""" + + +class PKIMapperV1_4: + """ + VyOS 1.4 PKI specifics. + + ACME listen-address only supports IPv4 on VyOS 1.4. + """ + pass diff --git a/backend/vyos_mappers/pki/pki_versions/v1_5.py b/backend/vyos_mappers/pki/pki_versions/v1_5.py new file mode 100644 index 0000000..2a2b420 --- /dev/null +++ b/backend/vyos_mappers/pki/pki_versions/v1_5.py @@ -0,0 +1,10 @@ +"""VyOS 1.5 specific PKI mapper overrides.""" + + +class PKIMapperV1_5: + """ + VyOS 1.5 PKI specifics. + + ACME listen-address supports both IPv4 and IPv6 on VyOS 1.5. + """ + pass diff --git a/frontend/src/app/pki/page.tsx b/frontend/src/app/pki/page.tsx new file mode 100644 index 0000000..ae3824f --- /dev/null +++ b/frontend/src/app/pki/page.tsx @@ -0,0 +1,826 @@ +"use client"; + +import { useState, useEffect } from "react"; +import { AppLayout } from "@/components/layout/AppLayout"; +import { Button } from "@/components/ui/button"; +import { Card } from "@/components/ui/card"; +import { Badge } from "@/components/ui/badge"; +import { Tabs, TabsContent, TabsList, TabsTrigger } from "@/components/ui/tabs"; +import { + Table, + TableBody, + TableCell, + TableHead, + TableHeader, + TableRow, +} from "@/components/ui/table"; +import { + ShieldCheck, + Plus, + RefreshCw, + Loader2, + AlertCircle, + Pencil, + Trash2, + FileText, + Key, + Terminal, + Lock, + Settings, + Eye, +} from "lucide-react"; +import { cn } from "@/lib/utils"; +import { + pkiService, + type PKIConfigResponse, + type PKICapabilities, + type PKICA, + type PKICertificate, + type PKIDH, + type PKIKeyPair, + type PKIOpenSSH, + type PKIOpenVPNSharedSecret, +} from "@/lib/api/pki"; +import { usePermissions } from "@/hooks/usePermissions"; +import { FeatureGroup } from "@/lib/api/user-management"; +import { + CAModal, + CertificateModal, + DHModal, + KeyPairModal, + OpenSSHModal, + OpenVPNSecretModal, + X509DefaultsModal, + DeletePKIItemModal, + PKIDetailSheet, + type PKIViewingItem, +} from "@/components/pki"; + +export default function PKIPage() { + const { canRead, canWrite } = usePermissions(); + const hasRead = canRead(FeatureGroup.PKI); + const hasWrite = canWrite(FeatureGroup.PKI); + + const [loading, setLoading] = useState(true); + const [error, setError] = useState(null); + const [config, setConfig] = useState(null); + const [capabilities, setCapabilities] = useState(null); + const [activeTab, setActiveTab] = useState("certificates"); + + // Modal state - CA + const [showCAModal, setShowCAModal] = useState(false); + const [editingCA, setEditingCA] = useState(null); + + // Modal state - Certificate + const [showCertModal, setShowCertModal] = useState(false); + const [editingCert, setEditingCert] = useState(null); + + // Modal state - DH + const [showDHModal, setShowDHModal] = useState(false); + const [editingDH, setEditingDH] = useState(null); + + // Modal state - Key Pair + const [showKeyPairModal, setShowKeyPairModal] = useState(false); + const [editingKeyPair, setEditingKeyPair] = useState(null); + + // Modal state - OpenSSH + const [showOpenSSHModal, setShowOpenSSHModal] = useState(false); + const [editingOpenSSH, setEditingOpenSSH] = useState(null); + + // Modal state - OpenVPN + const [showOpenVPNModal, setShowOpenVPNModal] = useState(false); + const [editingOpenVPN, setEditingOpenVPN] = useState(null); + + // Modal state - X509 + const [showX509Modal, setShowX509Modal] = useState(false); + + // Delete modal + const [deleteTarget, setDeleteTarget] = useState<{ + type: string; + name: string; + onDelete: () => Promise; + } | null>(null); + + // Detail sheet + const [viewingItem, setViewingItem] = useState(null); + + const fetchConfig = async (refresh = false) => { + try { + setLoading(true); + setError(null); + const [configData, capsData] = await Promise.all([ + pkiService.getConfig(refresh), + pkiService.getCapabilities(), + ]); + setConfig(configData); + setCapabilities(capsData); + } catch (err) { + setError(err instanceof Error ? err.message : "Failed to load PKI configuration"); + } finally { + setLoading(false); + } + }; + + useEffect(() => { + if (hasRead) fetchConfig(); + }, [hasRead]); + + const onSuccess = () => fetchConfig(true); + + // Loading state + if (loading && !config) { + return ( + +
+
+ +

Loading PKI configuration...

+
+
+
+ ); + } + + // Error state + if (error && !config) { + return ( + +
+
+ +

Failed to load configuration

+

{error}

+ +
+
+
+ ); + } + + const totals = config?.totals; + + return ( + +
+ {/* Header */} +
+
+
+
+ +
+
+
+

PKI Management

+ {config?.configured ? ( + Configured + ) : ( + Not Configured + )} +
+

+ Manage certificates, keys, and PKI infrastructure +

+
+
+ +
+ + {/* Stats */} +
+ +
+ + CAs +
+

{totals?.ca ?? 0}

+
+ +
+ + Certificates +
+

{totals?.certificates ?? 0}

+
+ +
+ + DH Params +
+

{totals?.dh ?? 0}

+
+ +
+ + Key Pairs +
+

{totals?.key_pairs ?? 0}

+
+ +
+ + OpenSSH +
+

{totals?.openssh ?? 0}

+
+ +
+ + OpenVPN +
+

{totals?.openvpn_shared_secrets ?? 0}

+
+
+
+ + {/* Tabs Content */} +
+ + + Certificates + Certificate Authorities + Key Pairs + DH Parameters + OpenSSH + OpenVPN + X509 Defaults + + + {/* Certificates Tab */} + +
+

Certificates

+ {hasWrite && ( + + )} +
+ + + + + Name + Type + Certificate + Private Key + Description + Status + Actions + + + + {(config?.certificates || []).length === 0 ? ( + + + No certificates configured + + + ) : ( + config?.certificates.map((cert) => ( + + {cert.name} + + {cert.acme ? ( + ACME + ) : ( + Manual + )} + + + {cert.certificate ? ( + Present + ) : ( + Not set + )} + + + {cert.private_key ? ( + Present + ) : ( + Not set + )} + + {cert.description || "—"} + +
+ {cert.revoke && Revoked} + {cert.password_protected && Protected} +
+
+ +
+ + {hasWrite && ( + <> + + + + )} +
+
+
+ )) + )} +
+
+
+
+ + {/* Certificate Authorities Tab */} + +
+

Certificate Authorities

+ {hasWrite && ( + + )} +
+ + + + + Name + Certificate + Private Key + Description + Status + Actions + + + + {(config?.ca || []).length === 0 ? ( + + + No certificate authorities configured + + + ) : ( + config?.ca.map((ca) => ( + + {ca.name} + + {ca.certificate ? ( + Present + ) : ( + Not set + )} + + + {ca.private_key ? ( + Present + ) : ( + Not set + )} + + {ca.description || "—"} + +
+ {ca.revoke && Revoked} + {ca.system_install && System Install} + {ca.password_protected && Protected} + {ca.crl?.length > 0 && CRL ({ca.crl.length})} +
+
+ +
+ + {hasWrite && ( + <> + + + + )} +
+
+
+ )) + )} +
+
+
+
+ + {/* Key Pairs Tab */} + +
+

Key Pairs

+ {hasWrite && ( + + )} +
+ + + + + Name + Private Key + Public Key + Status + Actions + + + + {(config?.key_pairs || []).length === 0 ? ( + + + No key pairs configured + + + ) : ( + config?.key_pairs.map((kp) => ( + + {kp.name} + + {kp.private_key ? ( + Present + ) : ( + Not set + )} + + + {kp.public_key ? ( + Present + ) : ( + Not set + )} + + + {kp.password_protected && Protected} + + +
+ + {hasWrite && ( + <> + + + + )} +
+
+
+ )) + )} +
+
+
+
+ + {/* DH Parameters Tab */} + +
+

DH Parameters

+ {hasWrite && ( + + )} +
+ + + + + Name + Parameters + Actions + + + + {(config?.dh || []).length === 0 ? ( + + + No DH parameters configured + + + ) : ( + config?.dh.map((dh) => ( + + {dh.name} + + {dh.parameters ? ( + Present + ) : ( + Not set + )} + + +
+ + {hasWrite && ( + <> + + + + )} +
+
+
+ )) + )} +
+
+
+
+ + {/* OpenSSH Tab */} + +
+

OpenSSH Keys

+ {hasWrite && ( + + )} +
+ + + + + Name + Private Key + Public Key + Type + Status + Actions + + + + {(config?.openssh || []).length === 0 ? ( + + + No OpenSSH keys configured + + + ) : ( + config?.openssh.map((ssh) => ( + + {ssh.name} + + {ssh.private_key ? ( + Present + ) : ( + Not set + )} + + + {ssh.public_key ? ( + Present + ) : ( + Not set + )} + + {ssh.public_type || "—"} + + {ssh.password_protected && Protected} + + +
+ + {hasWrite && ( + <> + + + + )} +
+
+
+ )) + )} +
+
+
+
+ + {/* OpenVPN Tab */} + +
+

OpenVPN Shared Secrets

+ {hasWrite && ( + + )} +
+ + + + + Name + Key + Version + Actions + + + + {(config?.openvpn_shared_secrets || []).length === 0 ? ( + + + No OpenVPN shared secrets configured + + + ) : ( + config?.openvpn_shared_secrets.map((secret) => ( + + {secret.name} + + {secret.key ? ( + Present + ) : ( + Not set + )} + + {secret.version || "—"} + +
+ + {hasWrite && ( + <> + + + + )} +
+
+
+ )) + )} +
+
+
+
+ + {/* X509 Defaults Tab */} + +
+

X509 Defaults

+ {hasWrite && ( + + )} +
+ +
+
+

Country

+

{config?.x509_defaults?.country || "—"}

+
+
+

State

+

{config?.x509_defaults?.state || "—"}

+
+
+

Locality

+

{config?.x509_defaults?.locality || "—"}

+
+
+

Organization

+

{config?.x509_defaults?.organization || "—"}

+
+
+
+
+
+
+
+ + {/* Detail Sheet */} + setViewingItem(null)} /> + + {/* Modals */} + { setShowCAModal(v); if (!v) setEditingCA(null); }} + onSuccess={onSuccess} + existingCA={editingCA} + x509Defaults={config?.x509_defaults || {}} + /> + + { setShowCertModal(v); if (!v) setEditingCert(null); }} + onSuccess={onSuccess} + existingCert={editingCert} + capabilities={capabilities} + availableCAs={config?.ca || []} + x509Defaults={config?.x509_defaults || {}} + /> + + { setShowDHModal(v); if (!v) setEditingDH(null); }} + onSuccess={onSuccess} + existingDH={editingDH} + /> + + { setShowKeyPairModal(v); if (!v) setEditingKeyPair(null); }} + onSuccess={onSuccess} + existingKeyPair={editingKeyPair} + /> + + { setShowOpenSSHModal(v); if (!v) setEditingOpenSSH(null); }} + onSuccess={onSuccess} + existingKey={editingOpenSSH} + /> + + { setShowOpenVPNModal(v); if (!v) setEditingOpenVPN(null); }} + onSuccess={onSuccess} + existingSecret={editingOpenVPN} + /> + + + + {deleteTarget && ( + { if (!v) setDeleteTarget(null); }} + onSuccess={onSuccess} + itemType={deleteTarget.type} + itemName={deleteTarget.name} + onDelete={deleteTarget.onDelete} + /> + )} +
+ ); +} diff --git a/frontend/src/components/layout/Sidebar.tsx b/frontend/src/components/layout/Sidebar.tsx index 57d714c..395a930 100644 --- a/frontend/src/components/layout/Sidebar.tsx +++ b/frontend/src/components/layout/Sidebar.tsx @@ -10,7 +10,7 @@ import { CollapsibleContent, CollapsibleTrigger, } from "@/components/ui/collapsible"; -import { Activity, ChevronDown, HeartPulse, Shield, Network, Server, Settings, LayoutDashboard, Route, Lock, LogOut, User, FileText, Building2, Power, PowerOff, Scale } from "lucide-react"; +import { Activity, ChevronDown, HeartPulse, Shield, ShieldCheck, Network, Server, Settings, LayoutDashboard, Route, Lock, LogOut, User, FileText, Building2, Power, PowerOff, Scale } from "lucide-react"; import { ScrollArea } from "@/components/ui/scroll-area"; import { Button } from "@/components/ui/button"; import { useSession, signOut } from "@/lib/auth-client"; @@ -134,6 +134,12 @@ const navigation: NavItem[] = [ }, ], }, + { + title: "PKI", + href: "/pki", + icon: ShieldCheck, + requiredPermission: FeatureGroup.PKI, + }, { title: "Policies", icon: FileText, diff --git a/frontend/src/components/pki/CAModal.tsx b/frontend/src/components/pki/CAModal.tsx new file mode 100644 index 0000000..61b8511 --- /dev/null +++ b/frontend/src/components/pki/CAModal.tsx @@ -0,0 +1,464 @@ +"use client"; + +import { useState, useEffect } from "react"; +import { + Dialog, + DialogContent, + DialogDescription, + DialogFooter, + DialogHeader, + DialogTitle, +} from "@/components/ui/dialog"; +import { Button } from "@/components/ui/button"; +import { Input } from "@/components/ui/input"; +import { Label } from "@/components/ui/label"; +import { Textarea } from "@/components/ui/textarea"; +import { Checkbox } from "@/components/ui/checkbox"; +import { ScrollArea } from "@/components/ui/scroll-area"; +import { Tabs, TabsContent, TabsList, TabsTrigger } from "@/components/ui/tabs"; +import { + Select, + SelectContent, + SelectItem, + SelectTrigger, + SelectValue, +} from "@/components/ui/select"; +import { AlertCircle, Loader2, ShieldCheck, Info } from "lucide-react"; +import { pkiService, PKICA, PKIX509Defaults } from "@/lib/api/pki"; +import { ApiError } from "@/lib/types/api"; + +interface CAModalProps { + open: boolean; + onOpenChange: (open: boolean) => void; + onSuccess: () => void; + existingCA: PKICA | null; + x509Defaults: PKIX509Defaults; +} + +export function CAModal({ open, onOpenChange, onSuccess, existingCA, x509Defaults }: CAModalProps) { + const isEdit = !!existingCA; + + const [mode, setMode] = useState<"import" | "generate">("import"); + + // Import fields + const [name, setName] = useState(""); + const [certificate, setCertificate] = useState(""); + const [description, setDescription] = useState(""); + const [privateKey, setPrivateKey] = useState(""); + const [passwordProtected, setPasswordProtected] = useState(false); + const [crl, setCrl] = useState(""); + + // Generate fields + const [genName, setGenName] = useState(""); + const [keyType, setKeyType] = useState<"rsa" | "ec">("rsa"); + const [keySize, setKeySize] = useState("2048"); + const [country, setCountry] = useState(""); + const [state, setState] = useState(""); + const [locality, setLocality] = useState(""); + const [organization, setOrganization] = useState(""); + const [commonName, setCommonName] = useState(""); + const [days, setDays] = useState("3650"); + const [encryptKey, setEncryptKey] = useState(false); + const [passphrase, setPassphrase] = useState(""); + + // Shared fields + const [revoke, setRevoke] = useState(false); + const [systemInstall, setSystemInstall] = useState(false); + + const [loading, setLoading] = useState(false); + const [error, setError] = useState(null); + + const rsaKeySizes = ["2048", "3072", "4096"]; + const ecKeySizes = ["256", "384", "521"]; + + useEffect(() => { + if (open) { + if (existingCA) { + setMode("import"); + setName(existingCA.name); + setCertificate(""); + setDescription(existingCA.description || ""); + setPrivateKey(""); + setPasswordProtected(existingCA.password_protected); + setCrl(existingCA.crl?.join("\n") || ""); + setRevoke(existingCA.revoke); + setSystemInstall(existingCA.system_install); + } else { + setMode("import"); + setName(""); + setCertificate(""); + setDescription(""); + setPrivateKey(""); + setPasswordProtected(false); + setCrl(""); + setGenName(""); + setKeyType("rsa"); + setKeySize("2048"); + setCountry(x509Defaults.country || ""); + setState(x509Defaults.state || ""); + setLocality(x509Defaults.locality || ""); + setOrganization(x509Defaults.organization || ""); + setCommonName(""); + setDays("3650"); + setEncryptKey(false); + setPassphrase(""); + setRevoke(false); + setSystemInstall(false); + } + setError(null); + } + }, [open, existingCA]); + + // Update key size options when key type changes + useEffect(() => { + if (keyType === "rsa" && !rsaKeySizes.includes(keySize)) { + setKeySize("2048"); + } else if (keyType === "ec" && !ecKeySizes.includes(keySize)) { + setKeySize("256"); + } + }, [keyType]); + + const handleImportSubmit = async () => { + if (!name.trim()) { setError("Name is required"); return; } + + setLoading(true); + setError(null); + + try { + const crlList = crl.split("\n").map(s => s.trim()).filter(Boolean); + let result; + if (isEdit) { + result = await pkiService.updateCA(name.trim(), existingCA!, { + certificate: certificate || undefined, + description, + private_key: privateKey || undefined, + password_protected: passwordProtected, + crl: crlList.length > 0 ? crlList : undefined, + revoke, + system_install: systemInstall, + }); + } else { + result = await pkiService.createCA(name.trim(), { + certificate: certificate || undefined, + description: description || undefined, + private_key: privateKey || undefined, + password_protected: passwordProtected || undefined, + crl: crlList.length > 0 ? crlList : undefined, + revoke: revoke || undefined, + system_install: systemInstall || undefined, + }); + } + + if (result.success) { + onOpenChange(false); + onSuccess(); + } else { + setError(result.error || "Operation failed"); + } + } catch (err) { + setError((err as ApiError).message || "Operation failed"); + } finally { + setLoading(false); + } + }; + + const handleGenerateSubmit = async () => { + if (!genName.trim()) { setError("Name is required"); return; } + if (!commonName.trim()) { setError("Common Name is required"); return; } + if (encryptKey && !passphrase) { setError("Passphrase is required when encrypting the key"); return; } + + const daysNum = parseInt(days, 10); + if (isNaN(daysNum) || daysNum < 1) { setError("Days must be a positive number"); return; } + + setLoading(true); + setError(null); + + try { + const result = await pkiService.generateCA({ + name: genName.trim(), + key_type: keyType, + key_size: parseInt(keySize, 10), + country: country || undefined, + state: state || undefined, + locality: locality || undefined, + organization: organization || undefined, + common_name: commonName.trim(), + days: daysNum, + encrypt_key: encryptKey, + passphrase: encryptKey ? passphrase : undefined, + revoke, + system_install: systemInstall, + }); + + if (result.success) { + onOpenChange(false); + onSuccess(); + } else { + setError(result.error || "Generation failed"); + } + } catch (err) { + setError((err as ApiError).message || "Generation failed"); + } finally { + setLoading(false); + } + }; + + const handleSubmit = mode === "generate" ? handleGenerateSubmit : handleImportSubmit; + + return ( + + + + + + {isEdit ? "Edit" : "Add"} Certificate Authority + + + {isEdit ? `Editing CA: ${existingCA?.name}` : "Import an existing CA or generate a new self-signed CA"} + + + + +
+ {!isEdit && ( + { setMode(v as "import" | "generate"); setError(null); }}> + + Import + Generate + + + +
+ + setName(e.target.value)} placeholder="my-ca" /> +
+ +
+ +