diff --git a/nmostesting/Config.py b/nmostesting/Config.py index 07c484f3..51229287 100644 --- a/nmostesting/Config.py +++ b/nmostesting/Config.py @@ -170,8 +170,8 @@ # video/raw, etc. "width": 1920, "height": 1080, - "interlace": True, - "exactframerate": "25", + "interlace": False, + "exactframerate": "60", "depth": 10, "sampling": "YCbCr-4:2:2", "colorimetry": "BT709", @@ -375,6 +375,17 @@ } } }, + "bcp-004-02": { + "repo": "bcp-004-02", + "branch": "v1.0-dev", + "versions": ["v1.0-dev"], + "default_version": "v1.0-dev", + "apis": { + "sender-caps": { + "name": "Sender Capabilities" + } + } + }, "bcp-005-01": { "repo": "bcp-005-01", "versions": ["v1.0"], @@ -383,8 +394,10 @@ }, "nmos-parameter-registers": { "repo": "nmos-parameter-registers", - "versions": ["main"], - "default_version": "main", + "url": "https://github.com/alabou/", + "branch": "bcp-005-02", + "versions": ["bcp-005-02"], + "default_version": "bcp-005-02", "apis": { "caps-register": { "name": "Capabilities Register" diff --git a/nmostesting/NMOSTesting.py b/nmostesting/NMOSTesting.py index e5a25374..99f309fb 100644 --- a/nmostesting/NMOSTesting.py +++ b/nmostesting/NMOSTesting.py @@ -88,6 +88,7 @@ from .suites import BCP0050101Test from .suites import BCP0060101Test from .suites import BCP0060102Test +from .suites import BCP0050201Test from .suites import BCP00604Test @@ -431,6 +432,33 @@ }], "class": BCP0060102Test.BCP0060102Test }, + "BCP-005-02-01": { + "name": "BCP-005-02 NMOS With IPMX/HKEP", + "specs": [{ + "spec_key": "is-04", + "api_key": "node" + }, { + "spec_key": "is-05", + "api_key": "connection" + }], + "extra_specs": [{ + "spec_key": "nmos-parameter-registers", + "api_key": "flow-register" + }, { + "spec_key": "nmos-parameter-registers", + "api_key": "sender-register" + }, { + "spec_key": "nmos-parameter-registers", + "api_key": "caps-register" + }, { + "spec_key": "bcp-004-01", + "api_key": "receiver-caps" + }, { + "spec_key": "bcp-004-02", + "api_key": "sender-caps" + }], + "class": BCP0050201Test.BCP0050201Test + }, "BCP-006-04": { "name": "BCP-006-04 NMOS With MPEG TS", "specs": [{ @@ -730,8 +758,9 @@ def init_spec_cache(): if repo_data["repo"] is None: continue if not os.path.exists(path): - print(" * Initialising repository '{}'".format(repo_data["repo"])) - repo = git.Repo.clone_from('https://github.com/AMWA-TV/' + repo_data["repo"] + '.git', path) + repo_url = repo_data.get("url", "https://github.com/AMWA-TV/") + print(" * Initialising repository '{}' at url '{}'".format(repo_data["repo"], repo_url)) + repo = git.Repo.clone_from(repo_url + repo_data["repo"] + '.git', path) update_last_pull = True else: repo = git.Repo(path) @@ -1272,3 +1301,4 @@ def main(args): # Exit the application with the desired code sys.exit(exit_code) + diff --git a/nmostesting/suites/BCP0050201Test.py b/nmostesting/suites/BCP0050201Test.py new file mode 100644 index 00000000..2e43cd83 --- /dev/null +++ b/nmostesting/suites/BCP0050201Test.py @@ -0,0 +1,482 @@ +# Copyright (C) 2025 Matrox Graphics Inc. +# Copyright (C) 2025 Advanced Media Workflow Association +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import json +import re +import os + +from jsonschema import ValidationError + +from ..GenericTest import GenericTest, NMOSTestException +from ..IS04Utils import IS04Utils +from ..IS05Utils import IS05Utils +from ..TestHelper import load_resolved_schema +from ..TestHelper import check_content_type +from ..TestResult import Test + +from pathlib import Path + +NODE_API_KEY = "node" +CONNECTION_API_KEY = "connection" +RECEIVER_CAPS_KEY = "receiver-caps" +SENDER_CAPS_KEY = "sender-caps" + +# Generic capabilities from any namespace + + +def cap_without_namespace(s): + match = re.search(r'^urn:[a-z0-9][a-z0-9-]+:cap:(.*)', s) + return match.group(1) if match else None + + +def get_key_value(obj, name): + regex = re.compile(r'^urn:[a-z0-9][a-z0-9-]+:' + name + r'$') + for key, value in obj.items(): + if regex.fullmatch(key): + return value + return obj[name] # final try without a namespace + + +def has_key(obj, name): + regex = re.compile(r'^urn:[a-z0-9][a-z0-9-]+:' + name + r'$') + for key in obj.keys(): + if regex.fullmatch(key): + return True + return name in obj # final try without a namespace + + +class BCP0050201Test(GenericTest): + """ + Runs Node Tests covering sender and receiver with IPMX/HKEP + """ + + def __init__(self, apis, **kwargs): + # Don't auto-test /transportfile as it is permitted to generate a 404 when master_enable is false + omit_paths = [ + "/single/senders/{senderId}/transportfile", + "/single/senders/{senderId}/staged", + "/single/senders/{senderId}/active", + "/single/senders/{senderId}/constraints", + "/single/senders/{senderId}/transporttype", + "/single/receivers/{receiverId}/staged", + "/single/receivers/{receiverId}/active", + "/single/receivers/{receiverId}/constraints", + "/single/receivers/{receiverId}/transporttype", + ] + GenericTest.__init__(self, apis, omit_paths, **kwargs) + self.node_url = self.apis[NODE_API_KEY]["url"] + self.connection_url = self.apis[CONNECTION_API_KEY]["url"] + self.is04_resources = {"senders": {}, "receivers": {}, "_requested": [], "sources": {}, "flows": {}} + self.is05_resources = {"senders": [], "receivers": [], + "_requested": [], "transport_types": {}, "transport_files": {}} + self.is04_utils = IS04Utils(self.node_url) + self.is05_utils = IS05Utils(self.connection_url) + self.test = Test("default") + + # Utility function from IS0502Test + def get_is04_resources(self, resource_type): + """Retrieve all Senders or Receivers from a Node API, keeping hold of the returned objects""" + assert resource_type in ["senders", "receivers", "sources", "flows"] + + # Prevent this being executed twice in one test run + if resource_type in self.is04_resources["_requested"]: + return True, "" + + path_url = resource_type + full_url = self.node_url + path_url + valid, resources = self.do_request("GET", full_url) + if not valid: + return False, "Node API did not respond as expected: {}".format(resources) + schema = self.get_schema(NODE_API_KEY, "GET", "/" + path_url, resources.status_code) + valid, message = self.check_response(schema, "GET", resources) + if not valid: + raise NMOSTestException(self.test.FAIL(message)) + + try: + for resource in resources.json(): + self.is04_resources[resource_type][resource["id"]] = resource + self.is04_resources["_requested"].append(resource_type) + except json.JSONDecodeError: + return False, "Non-JSON response returned from Node API" + + return True, "" + + def get_is05_partial_resources(self, resource_type): + """Retrieve all Senders or Receivers from a Connection API, keeping hold of the returned IDs""" + assert resource_type in ["senders", "receivers"] + + # Prevent this being executed twice in one test run + if resource_type in self.is05_resources["_requested"]: + return True, "" + + path_url = "single/" + resource_type + full_url = self.connection_url + path_url + valid, resources = self.do_request("GET", full_url) + if not valid: + return False, "Connection API did not respond as expected: {}".format(resources) + + schema = self.get_schema(CONNECTION_API_KEY, "GET", "/" + path_url, resources.status_code) + valid, message = self.check_response(schema, "GET", resources) + if not valid: + raise NMOSTestException(self.test.FAIL(message)) + + # The following call to is05_utils.get_transporttype does not validate against the IS-05 schemas, + # which is good for allowing extended transport. The transporttype-response-schema.json schema is + # broken as it does not allow additional transport, nor x-nmos ones, nor vendor specific ones. + try: + for resource in resources.json(): + resource_id = resource.rstrip("/") + self.is05_resources[resource_type].append(resource_id) + if self.is05_utils.compare_api_version(self.apis[CONNECTION_API_KEY]["version"], "v1.1") >= 0: + transport_type = self.is05_utils.get_transporttype(resource_id, resource_type.rstrip("s")) + self.is05_resources["transport_types"][resource_id] = transport_type + else: + self.is05_resources["transport_types"][resource_id] = "urn:x-nmos:transport:rtp" + if resource_type == "senders": + transport_file = self.is05_utils.get_transportfile(resource_id) + self.is05_resources["transport_files"][resource_id] = transport_file + self.is05_resources["_requested"].append(resource_type) + except json.JSONDecodeError: + return False, "Non-JSON response returned from Node API" + + return True, "" + + def check_response_without_transport_params(self, schema, method, response): + """Confirm that a given Requests response conforms to the expected schema and has any expected headers without + considering the 'transport_params' attribute""" + ctype_valid, ctype_message = check_content_type(response.headers) + if not ctype_valid: + return False, ctype_message + + cors_valid, cors_message = self.check_CORS(method, response.headers) + if not cors_valid: + return False, cors_message + + fields_to_ignore = ["transport_params"] + + data = response.json() + + filtered_data = {k: v for k, v in data.items() if k not in fields_to_ignore} + + filtered_data["transport_params"] = [] + + try: + self.validate_schema(filtered_data, schema) + except ValidationError as e: + return False, "Response schema validation error {}".format(e) + except json.JSONDecodeError: + return False, "Invalid JSON received" + + return True, ctype_message + + def test_01(self, test): + """Check that version 1.3+ the Node API and version 1.1+ of the Connection API are available""" + + self.test = test + + # REFERENCE: Nodes compliant with this specification MUST implement [IS-04][] v1.3 or higher + # and [IS-05][] v1.1 or higher. + api = self.apis[NODE_API_KEY] + if self.is04_utils.compare_api_version(api["version"], "v1.3") >= 0: + valid, result = self.do_request("GET", self.node_url) + if not valid: + return test.FAIL("Node API did not respond as expected: {}".format(result)) + else: + return test.FAIL("Node API must be running v1.3 or newer in order to run this test suite") + + api = self.apis[CONNECTION_API_KEY] + if self.is04_utils.compare_api_version(api["version"], "v1.1") >= 0: + valid, result = self.do_request("GET", self.node_url) + if not valid: + return test.FAIL("Connection API did not respond as expected: {}".format(result)) + else: + return test.FAIL("Connection API must be running v1.1 or newer in order to run this test suite") + + return test.PASS() + + def test_02(self, test): + """Check HKEP Senders""" + + self.test = test + + api = self.apis[SENDER_CAPS_KEY] + + reg_api = self.apis["caps-register"] + reg_path = reg_api["spec_path"] + "/capabilities" + + # REFERENCE: Nodes capable of transmitting HDCP encrypted streams using the IPMX/HKEP protocol MUST have + # Source, Flow and Sender resources in the IS-04 Node API. + valid, result = self.get_is04_resources("senders") + if not valid: + return test.FAIL(result) + + schema = load_resolved_schema(api["spec_path"], "sender_constraint_sets.json") + + reg_schema_file = str(Path(os.path.abspath(reg_path)) / "constraint_set.json") + with open(reg_schema_file, "r") as f: + reg_schema_obj = json.load(f) + reg_schema = load_resolved_schema(api["spec_path"], schema_obj=reg_schema_obj) + + no_hkep_senders = True + access_error = False + + warning = "" + + if len(self.is04_resources["senders"].values()) == 0: + return test.UNCLEAR("No Senders were found on the Node") + + for sender in self.is04_resources["senders"].values(): + + # REFERENCE A Sender compliant with the [HKEP](#hkep) section of this document MUST provide a + # `hkep` Sender attribute to indicate that HDCP encryption and the HKEP + # protocol are used by the Sender. + # + # It is optional to be compliant with BCP-005-02 HKEP section so only test if the sender declares + # being compliant with BCP-005-02 (IPMX/HKEP). + if has_key(sender, "hkep"): + + no_hkep_senders = False + + # this if the state of the sender + hkep = get_key_value(sender, "hkep") + + # These are the states EXPLICITLY allowed by the sender's capabilities. + only_allow_true = None + only_allow_false = None + + # REFERENCE: A Sender MAY provide a `urn:x-nmos:cap:transport:hkep` capability to indicate that + # HDCP encryption and the HKEP protocol are supported. + # + # If only_allow_true and only_allow_true are None this indicates that capabilities were not provided. + if "constraint_sets" in sender["caps"]: + + try: + self.validate_schema(sender, schema) + except ValidationError as e: + return test.FAIL("Sender {} does not comply with schema, error {}".format(sender["id"], e)) + + for constraint_set in sender["caps"]["constraint_sets"]: + + try: + self.validate_schema(constraint_set, reg_schema) + except ValidationError as e: + return test.FAIL( + "Sender {} constraint_sets do not comply with registered schema, error {}".format( + sender["id"], e)) + + # Ignore disabled constraint sets + if ("urn:x-nmos:cap:meta:enabled" in constraint_set and + not constraint_set["urn:x-nmos:cap:meta:enabled"]): + continue + + # Explicit declarations only + if has_key(constraint_set, "cap:transport:hkep"): + param_constraint = get_key_value(constraint_set, "cap:transport:hkep") + if "enum" in param_constraint: + if (True in param_constraint["enum"]) and (False not in param_constraint["enum"]): + only_allow_false = False + if only_allow_true is None: + only_allow_true = True + if (False in param_constraint["enum"]) and (True not in param_constraint["enum"]): + only_allow_true = False + if only_allow_false is None: + only_allow_false = True + else: + only_allow_true = False + only_allow_false = False + + # Check that the sender state does not contradict its explicit capabilities + if only_allow_true is not None: + # REFERENCE: If the `urn:x-nmos:cap:transport:hkep` capability only allows the value `true`, then + # the Sender's associated SDP transport file MUST have an `hkep` attribute. + # REFERENCE: A Sender MAY provide a `urn:x-nmos:cap:transport:hkep` capability to indicate that + # HDCP encryption and the HKEP protocol are supported. + # + # If so it must be consistent + if only_allow_true and not hkep: + return test.FAIL( + "Sender {} has an invalid 'hkep' state {} " + "which is not allowed by the Sender's capabilities only allowing 'true'".format( + sender["id"], hkep)) + + if only_allow_false is not None: + + # REFERENCE: If the `urn:x-nmos:cap:transport:hkep` capability only allows the value `false`, then + # the Sender's associated SDP transport file MUST NOT have an `hkep` attribute. + # REFERENCE: A Sender MAY provide a `urn:x-nmos:cap:transport:hkep` capability to indicate that + # HDCP encryption and the HKEP protocol are supported. + # + # If so it must be consistent + if only_allow_false and hkep: + return test.FAIL( + "Sender {} has an invalid 'hkep' state {} " + "which is not allowed by the Sender's capabilities only allowing 'false'".format( + sender["id"], hkep)) + + # Check SDP transport file. As per IS04 sender.json schema this attribute is required. + if "manifest_href" not in sender: + return test.FAIL("Sender {} MUST indicate the 'manifest_href' attribute." + .format(sender["id"])) + + # REFERENCE: If an SDP transport file is not currently available because the Sender is inactive, this + # attribute indicates whether or not such an SDP transport file would contain an `hkep` + # attribute if the Sender were active at that time. + # + # There may be no SDP transport file because the sender it inactive or because the transport used by + # the Sender does not support an SDP transport file. In both case we'll return an UNCLEAR status to + # indicate "could not test". + href = sender["manifest_href"] + if not href: + access_error = True + continue + + manifest_href_valid, manifest_href_response = self.do_request("GET", href) + if manifest_href_valid and manifest_href_response.status_code == 200: + pass + elif manifest_href_valid and manifest_href_response.status_code == 404: + access_error = True + continue + else: + return test.FAIL("Sender {} unexpected response from manifest_href '{}': {}" + .format(sender["id"], href, manifest_href_response)) + + sdp = manifest_href_response.text + sdp_lines = [sdp_line.replace("\r", "") for sdp_line in sdp.split("\n")] + + found_hkep = False + for sdp_line in sdp_lines: + hkep_attribute = re.search(r"^a=hkep:(.+)$", sdp_line) + if hkep_attribute: + found_hkep = True + + # SDP transport file must match with capabilities + if only_allow_true is not None: + # REFERENCE: If the `urn:x-nmos:cap:transport:hkep` capability only allows the value `true`, then + # the Sender's associated SDP transport file MUST have an `hkep` attribute. + if only_allow_true and not found_hkep: + return test.FAIL( + "Sender {} has an invalid SDP transport file without an 'hkep' attribute " + "which is not allowed by the Sender's capabilities only allowing 'true'".format( + sender["id"])) + + if only_allow_false is not None: + # REFERENCE: If the `urn:x-nmos:cap:transport:hkep` capability only allows the value `false`, then + # the Sender's associated SDP transport file MUST NOT have an `hkep` attribute. + if only_allow_false and found_hkep: + return test.FAIL( + "Sender {} has an invalid SDP transport file with an 'hkep' attribute " + "which is not allowed by the Sender's capabilities only allowing 'false'".format( + sender["id"])) + + # REFERENCE: A Sender compliant with the [HKEP](#hkep) section of this document MUST provide a + # `hkep` Sender attribute to indicate that HDCP encryption and the HKEP + # protocol are used by the Sender. This attribute MUST be `true` if an `hkep` attribute + # is present in the Sender's SDP transport file, and MUST be `false` if no `hkep` + # attributes are present. + # + # sender state must match with SDP transport file + if hkep != found_hkep: + return test.FAIL( + "Sender {} has an invalid SDP transport file {} an 'hkep' attribute " + "which does not match with the Sender hkep attribute {}".format( + sender["id"], "with" if found_hkep else "without", hkep)) + + if access_error: + return test.UNCLEAR("One or more of the tested Senders had null or empty 'manifest_href' or " + "returned a 404 HTTP code. Please ensure all Senders are enabled and re-test.") + if no_hkep_senders: + return test.OPTIONAL("No BCP-005-02 (IPMX/HKEP) Sender found") + + if warning != "": + return test.WARNING(warning) + else: + return test.PASS() + + def test_03(self, test): + """Check HKEP Receivers""" + + self.test = test + + api = self.apis[RECEIVER_CAPS_KEY] + + reg_api = self.apis["caps-register"] + reg_path = reg_api["spec_path"] + "/capabilities" + + # REFERENCE: Nodes capable of receiving HDCP encrypted streams using the IPMX/HKEP protocol MUST have Receiver + # resources in the IS-04 Node API. + valid, result = self.get_is04_resources("receivers") + if not valid: + return test.FAIL(result) + + schema = load_resolved_schema(api["spec_path"], "receiver_constraint_sets.json") + + reg_schema_file = str(Path(os.path.abspath(reg_path)) / "constraint_set.json") + with open(reg_schema_file, "r") as f: + reg_schema_obj = json.load(f) + reg_schema = load_resolved_schema(api["spec_path"], schema_obj=reg_schema_obj) + + no_hkep_receivers = True + no_constraint_sets = True + + warning = "" + + if len(self.is04_resources["receivers"].values()) == 0: + return test.UNCLEAR("No Receivers were found on the Node") + + for receiver in self.is04_resources["receivers"].values(): + + if "constraint_sets" in receiver["caps"]: + + no_constraint_sets = False + + try: + self.validate_schema(receiver, schema) + except ValidationError as e: + return test.FAIL("Receiver {} does not comply with schema, error {}".format(receiver["id"], e)) + + for constraint_set in receiver["caps"]["constraint_sets"]: + + try: + self.validate_schema(constraint_set, reg_schema) + except ValidationError as e: + return test.FAIL( + "Receiver {} constraint_sets do not comply with registered schema, error {}".format( + receiver["id"], e)) + + # Ignore disabled constraint sets + if ("urn:x-nmos:cap:meta:enabled" in constraint_set and + not constraint_set["urn:x-nmos:cap:meta:enabled"]): + continue + + # REFERENCE: A Receiver SHOULD provide a `urn:x-nmos:cap:transport:hkep` capability to indicate + # its support for Senders that use HDCP encryption and the HKEP protocol. + + # As it is optional to declare such capability and tag receiver having an explicit declarations + # only. If a receiver provides capabilities and declare an hkep capability it will be validated + # by the schemas verifications above. + if has_key(constraint_set, "cap:transport:hkep"): + no_hkep_receivers = False + + if no_constraint_sets: + return test.OPTIONAL("No Receiver describing BCP-004-01 Capabilities found") + + if no_hkep_receivers: + return test.OPTIONAL("No BCP-005-02 (IPMX/HKEP) Receiver found") + + if warning != "": + return test.WARNING(warning) + else: + return test.PASS()