Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 120 additions & 0 deletions src/synology_dsm/api/core/certificate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
"""DSM Certificate data."""

import json


class SynoCoreCertificate:
"""Class containing Certificate data."""

API_CERTIFICATE_KEY = "SYNO.Core.Certificate.CRT"
API_CERTIFICATE_SERVICE_KEY = "SYNO.Core.Certificate.Service" # method='set'

def __init__(self, dsm):
"""Constructor method."""
self._dsm = dsm
self._data = {}

def update(self):
"""Updates certificate data."""
raw_data = self._dsm.get(self.API_CERTIFICATE_KEY, "list")
if raw_data:
self._data = raw_data["data"]

@property
def success(self):
"""Gets the last scan success."""
return self._data.get("success")

@property
def all(self):
"""Gets all certificate names."""
cert_list = self._data.get("certificates")
return [cert['desc'] for cert in cert_list]

@property
def self_signed(self):
"""Gets only self-signed certificate names."""
cert_list = self._data.get("certificates")
return [cert['desc'] for cert in cert_list if cert['issuer']['organization'] == 'Synology Inc.']

@property
def lets_encrypt(self):
"""Gets only Let's Encrypt certificate names."""
cert_list = self._data.get("certificates")
return [cert['desc'] for cert in cert_list if cert['issuer']['organization'] == "Let's Encrypt"]

def _services_by_certificate(self, cert_name):
"""Gets services associated with certificate `cert_name`"""
cert_list = self._data.get("certificates")
cert_list = [cert for cert in cert_list if cert['desc'] == cert_name]
if not cert_list:
return []
cert = cert_list[0]
services = [serv for serv in cert['services']]
return services

def services_by_certificate(self, cert_name):
services = [svc['service']
for svc in self._services_by_certificate(cert_name)]
return services or []

def _services(self):
"""Gets all services that can be associated with certificates"""
services = []
for cert in self.all:
services += self._services_by_certificate(cert)
return services

def services(self):
services = [svc['service'] for svc in self._services()]
return services or []

def _get_cert_id_by_cert_name(self, cert_name):
cert_id = [cert['id'] for cert in self._data.get(
"certificates") if cert['desc'] == cert_name]
if len(cert_id) != 1:
raise RuntimeError('Certificate not found')
return cert_id[0]

def _get_cert_id_by_service_name(self, service_name):
for cert in self._data.get('certificates'):
for svc in cert['services']:
if svc['service'] == service_name:
return cert['id']
raise RuntimeError(
'No certificate was found associated to the specified service')

def assign_certificate_to_service(self, cert_name, service_names):
"""Sets certificate `cert_name` to all `service_name` services"""

# Verify specified cert_name is valid
if cert_name is None or cert_name not in self.all:
raise RuntimeError('Invalid certificate name!')

# Verify specified services are valid
all_services = self.services()
valid_services = [svc for svc in service_names if svc in all_services]
if not valid_services:
raise RuntimeError('Invalid service(s) name(s)!')

# Prepare payload for the REST API
new_cert_id = self._get_cert_id_by_cert_name(cert_name)
services = self._services()
services = [{'service': svc, 'old_id': self._get_cert_id_by_service_name(svc['service']), 'id': new_cert_id, }
for svc in services if svc['service'] in valid_services]
params = {'settings': json.dumps(services)}

# Final validation
for svc in services:
if not svc['service'] or not svc['old_id'] or not svc['id']:
raise RuntimeError('Malformed service configuration!')
if svc['old_id'] == svc['id']:
raise RuntimeError('Service {} is already using the specified certificate'. format(
svc['service']['display_name']))

# REST API call
res = self._dsm.post(api=self.API_CERTIFICATE_SERVICE_KEY,
method="set",
params=params)

return res
27 changes: 24 additions & 3 deletions src/synology_dsm/synology_dsm.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from requests.exceptions import RequestException

from .api.core.security import SynoCoreSecurity
from .api.core.certificate import SynoCoreCertificate
from .api.core.share import SynoCoreShare
from .api.core.system import SynoCoreSystem
from .api.core.upgrade import SynoCoreUpgrade
Expand Down Expand Up @@ -73,6 +74,7 @@ def __init__(
self._information = None
self._network = None
self._security = None
self._certificate = None
self._share = None
self._storage = None
self._surveillance = None
Expand Down Expand Up @@ -165,7 +167,8 @@ def login(self, otp_code: str = None) -> bool:
}
raise switcher.get(
result["error"]["code"],
SynologyDSMLoginFailedException(result["error"]["code"], self.username),
SynologyDSMLoginFailedException(
result["error"]["code"], self.username),
)

# Parse result if valid
Expand All @@ -177,7 +180,8 @@ def login(self, otp_code: str = None) -> bool:
# Not available on API version < 6 && device token is given once
# per device_name
self._device_token = result["data"]["did"]
self._debuglog("Authentication successful, token: " + str(self._session_id))
self._debuglog("Authentication successful, token: "
+ str(self._session_id))

if not self._information:
self._information = SynoDSMInformation(self)
Expand Down Expand Up @@ -303,7 +307,8 @@ def _execute_request(self, method: str, url: str, params: dict, **kwargs):

if response.status_code == 200:
# We got a DSM response
content_type = response.headers.get("Content-Type", "").split(";")[0]
content_type = response.headers.get(
"Content-Type", "").split(";")[0]

if content_type in [
"application/json",
Expand Down Expand Up @@ -334,6 +339,9 @@ def update(self, with_information: bool = False, with_network: bool = False):
if self._security:
self._security.update()

if self._certificate:
self._certificate.update()

if self._utilisation:
self._utilisation.update()

Expand Down Expand Up @@ -363,6 +371,9 @@ def reset(self, api: any) -> bool:
if api == SynoCoreSecurity.API_KEY:
self._security = None
return True
if api == SynoCoreCertificate.API_KEY:
self._certificate = None
return True
if api == SynoCoreShare.API_KEY:
self._share = None
return True
Expand All @@ -387,6 +398,9 @@ def reset(self, api: any) -> bool:
if isinstance(api, SynoCoreSecurity):
self._security = None
return True
if isinstance(api, SynoCoreCertificate):
self._certificate = None
return True
if isinstance(api, SynoCoreShare):
self._share = None
return True
Expand Down Expand Up @@ -438,6 +452,13 @@ def security(self) -> SynoCoreSecurity:
self._security = SynoCoreSecurity(self)
return self._security

@property
def certificate(self) -> SynoCoreCertificate:
"""Gets NAS Certificate informations."""
if not self._certificate:
self._certificate = SynoCoreCertificate(self)
return self._certificate

@property
def share(self) -> SynoCoreShare:
"""Gets NAS shares information."""
Expand Down