Skip to content

Commit 7276e0f

Browse files
committed
[DMI] Automate the process of device registration #1
- read from agreed topic using the VesEventSchema and call the registration endpoint in cps-ncmp to automate the device registration during mounting of the device. - also added non cloud kafka consumer factory as VES event is not cloud compliant - testware added Issue-ID: CPS-2710 Change-Id: I5ab695afc225dcc372cff00a2f6f69c9047b14ed Signed-off-by: mpriyank <[email protected]>
1 parent a6412ca commit 7276e0f

File tree

8 files changed

+314
-7
lines changed

8 files changed

+314
-7
lines changed
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* ============LICENSE_START========================================================
3+
* Copyright (c) 2025 OpenInfra Foundation Europe. All rights reserved.
4+
* ================================================================================
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*
17+
* SPDX-License-Identifier: Apache-2.0
18+
* ============LICENSE_END=========================================================
19+
*/
20+
21+
package org.onap.cps.ncmp.dmi.cmstack.ves;
22+
23+
import java.util.List;
24+
import lombok.RequiredArgsConstructor;
25+
import lombok.extern.slf4j.Slf4j;
26+
import org.onap.cps.ncmp.dmi.service.DmiService;
27+
import org.onap.cps.ncmp.events.ves30_2_1.VesEventSchema;
28+
import org.springframework.kafka.annotation.KafkaListener;
29+
import org.springframework.stereotype.Component;
30+
31+
@Slf4j
32+
@Component
33+
@RequiredArgsConstructor
34+
public class VesEventConsumer {
35+
36+
private final DmiService dmiService;
37+
38+
/**
39+
* Consume the VES event to discover the cm handles.
40+
*
41+
* @param vesEventSchema Schema for virtual network function event stream
42+
*/
43+
@KafkaListener(topics = "#{@vesEventsConfiguration.topicNames}",
44+
containerFactory = "legacyEventConcurrentKafkaListenerContainerFactory",
45+
properties = {"spring.json.value.default.type=org.onap.cps.ncmp.events.ves30_2_1.VesEventSchema"})
46+
public void consumeVesEvent(final VesEventSchema vesEventSchema) {
47+
48+
final String sourceName = vesEventSchema.getEvent().getCommonEventHeader().getSourceName();
49+
log.info("SourceName( CmHandleId ) from the VES event is : {}", sourceName);
50+
try {
51+
dmiService.registerCmHandles(List.of(sourceName));
52+
} catch (final Exception exception) {
53+
log.warn("Exception occurred for CmHandleId : {} with cause : {}", sourceName, exception.getMessage(),
54+
exception);
55+
}
56+
}
57+
58+
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* ============LICENSE_START========================================================
3+
* Copyright (c) 2025 OpenInfra Foundation Europe. All rights reserved.
4+
* ================================================================================
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*
17+
* SPDX-License-Identifier: Apache-2.0
18+
* ============LICENSE_END=========================================================
19+
*/
20+
21+
package org.onap.cps.ncmp.dmi.cmstack.ves;
22+
23+
import java.util.ArrayList;
24+
import java.util.List;
25+
import lombok.Getter;
26+
import lombok.Setter;
27+
import org.springframework.boot.context.properties.ConfigurationProperties;
28+
import org.springframework.stereotype.Component;
29+
30+
31+
@Getter
32+
@Setter
33+
@Component
34+
@ConfigurationProperties(prefix = "app.dmi.ves")
35+
public class VesEventsConfiguration {
36+
37+
private final List<String> topicNames = new ArrayList<>();
38+
39+
}

dmi-service/src/main/java/org/onap/cps/ncmp/dmi/config/kafka/KafkaConfig.java

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
/*
22
* ============LICENSE_START=======================================================
3-
* Copyright (C) 2023 Nordix Foundation
3+
* Copyright (C) 2023-2025 OpenInfra Foundation Europe. All rights reserved.
44
* ================================================================================
55
* Licensed under the Apache License, Version 2.0 (the "License");
66
* you may not use this file except in compliance with the License.
@@ -21,6 +21,7 @@
2121
package org.onap.cps.ncmp.dmi.config.kafka;
2222

2323
import io.cloudevents.CloudEvent;
24+
import java.time.Duration;
2425
import java.util.Map;
2526
import lombok.RequiredArgsConstructor;
2627
import org.apache.kafka.clients.producer.ProducerConfig;
@@ -132,11 +133,27 @@ public KafkaTemplate<String, CloudEvent> cloudEventKafkaTemplate() {
132133
*/
133134
@Bean
134135
public ConcurrentKafkaListenerContainerFactory<String, CloudEvent>
135-
cloudEventConcurrentKafkaListenerContainerFactory() {
136-
final ConcurrentKafkaListenerContainerFactory<String, CloudEvent> containerFactory =
137-
new ConcurrentKafkaListenerContainerFactory<>();
138-
containerFactory.setConsumerFactory(cloudEventConsumerFactory());
139-
return containerFactory;
136+
cloudEventConcurrentKafkaListenerContainerFactory() {
137+
final ConcurrentKafkaListenerContainerFactory<String, CloudEvent> concurrentKafkaListenerContainerFactory =
138+
new ConcurrentKafkaListenerContainerFactory<>();
139+
concurrentKafkaListenerContainerFactory.setConsumerFactory(cloudEventConsumerFactory());
140+
return concurrentKafkaListenerContainerFactory;
141+
}
142+
143+
/**
144+
* A legacy concurrent kafka listener container factory.
145+
*
146+
* @return instance of Concurrent kafka listener factory
147+
*/
148+
@Bean
149+
@Primary
150+
public ConcurrentKafkaListenerContainerFactory<String, T> legacyEventConcurrentKafkaListenerContainerFactory() {
151+
final ConcurrentKafkaListenerContainerFactory<String, T> concurrentKafkaListenerContainerFactory =
152+
new ConcurrentKafkaListenerContainerFactory<>();
153+
concurrentKafkaListenerContainerFactory.setConsumerFactory(legacyEventConsumerFactory());
154+
concurrentKafkaListenerContainerFactory.getContainerProperties()
155+
.setAuthExceptionRetryInterval(Duration.ofSeconds(10));
156+
return concurrentKafkaListenerContainerFactory;
140157
}
141158

142159
}

dmi-service/src/main/resources/application.yml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,10 @@ app:
7474
avc:
7575
cm-subscription-dmi-in: ${CM_SUBSCRIPTION_DMI_IN_TOPIC:ncmp-dmi-cm-avc-subscription}
7676
cm-subscription-dmi-out: ${CM_SUBSCRIPTION_DMI_OUT_TOPIC:dmi-ncmp-cm-avc-subscription}
77+
ves:
78+
topicNames:
79+
- "unauthenticated.VES_PNFREG_OUTPUT"
80+
- "unauthenticated.VES_O1_NOTIFY_PNF_REGISTRATION_OUTPUT"
7781

7882
notification:
7983
async:
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/*
2+
* ============LICENSE_START========================================================
3+
* Copyright (c) 2025 OpenInfra Foundation Europe. All rights reserved.
4+
* ================================================================================
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*
17+
* SPDX-License-Identifier: Apache-2.0
18+
* ============LICENSE_END=========================================================
19+
*/
20+
21+
package org.onap.cps.ncmp.dmi.cmstack.ves
22+
23+
import ch.qos.logback.classic.Level
24+
import ch.qos.logback.classic.Logger
25+
import ch.qos.logback.classic.spi.ILoggingEvent
26+
import ch.qos.logback.core.read.ListAppender
27+
import com.fasterxml.jackson.databind.ObjectMapper
28+
import org.onap.cps.ncmp.dmi.TestUtils
29+
import org.onap.cps.ncmp.dmi.api.kafka.MessagingBaseSpec
30+
import org.onap.cps.ncmp.dmi.exception.CmHandleRegistrationException
31+
import org.onap.cps.ncmp.dmi.service.DmiService
32+
import org.onap.cps.ncmp.events.ves30_2_1.VesEventSchema
33+
import org.slf4j.LoggerFactory
34+
import org.spockframework.spring.SpringBean
35+
import org.springframework.boot.test.context.SpringBootTest
36+
import org.springframework.test.annotation.DirtiesContext
37+
import org.testcontainers.spock.Testcontainers
38+
39+
@SpringBootTest(classes = [ObjectMapper])
40+
@Testcontainers
41+
@DirtiesContext
42+
class VesEventConsumerSpec extends MessagingBaseSpec {
43+
44+
def objectMapper = new ObjectMapper()
45+
def dmiService = Mock(DmiService)
46+
47+
@SpringBean
48+
VesEventConsumer objectUnderTest = new VesEventConsumer(dmiService)
49+
50+
def logger = Spy(ListAppender<ILoggingEvent>)
51+
52+
def vesEvent
53+
54+
void setup() {
55+
56+
def jsonData = TestUtils.getResourceFileContent('sampleVesEvent.json')
57+
vesEvent = objectMapper.readValue(jsonData, VesEventSchema.class)
58+
59+
((Logger) LoggerFactory.getLogger(VesEventConsumer.class)).addAppender(logger)
60+
logger.start()
61+
}
62+
63+
void cleanup() {
64+
((Logger) LoggerFactory.getLogger(VesEventConsumer.class)).detachAndStopAllAppenders()
65+
}
66+
67+
68+
def 'Consume a VES event'() {
69+
when: 'event is consumed'
70+
objectUnderTest.consumeVesEvent(vesEvent)
71+
then: 'cm handle(s) is registered with the dmi service'
72+
1 * dmiService.registerCmHandles(['pynts-o-du-o1'])
73+
74+
}
75+
76+
def 'Consume create event with error during registration'() {
77+
given: 'an error occured during registration'
78+
dmiService.registerCmHandles(_) >> { throw new CmHandleRegistrationException('some error for test') }
79+
when: 'event is consumed'
80+
objectUnderTest.consumeVesEvent(vesEvent)
81+
then: 'the correct exception is logged as a warning'
82+
def loggingEvent = logger.list[1]
83+
assert loggingEvent.level == Level.WARN
84+
assert loggingEvent.formattedMessage.contains('Not able to register the given cm-handles.')
85+
}
86+
87+
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* ============LICENSE_START========================================================
3+
* Copyright (c) 2025 OpenInfra Foundation Europe. All rights reserved.
4+
* ================================================================================
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*
17+
* SPDX-License-Identifier: Apache-2.0
18+
* ============LICENSE_END=========================================================
19+
*/
20+
21+
package org.onap.cps.ncmp.dmi.cmstack.ves
22+
23+
import org.springframework.beans.factory.annotation.Autowired
24+
import org.springframework.boot.context.properties.EnableConfigurationProperties
25+
import org.springframework.boot.test.context.SpringBootTest
26+
import org.springframework.test.context.ContextConfiguration
27+
import spock.lang.Specification
28+
29+
@SpringBootTest
30+
@ContextConfiguration(classes = [VesEventsConfiguration])
31+
@EnableConfigurationProperties
32+
class VesEventsConfigurationSpec extends Specification {
33+
34+
@Autowired
35+
VesEventsConfiguration vesEventsConfiguration
36+
37+
def 'Check the test topics configured'() {
38+
expect: 'VES topics are populated'
39+
assert vesEventsConfiguration.topicNames.size() == 2
40+
and: 'correct topics are present'
41+
assert vesEventsConfiguration.topicNames.contains('unauthenticated.VES_PNFREG_OUTPUT')
42+
assert vesEventsConfiguration.topicNames.contains('unauthenticated.VES_O1_NOTIFY_PNF_REGISTRATION_OUTPUT')
43+
}
44+
45+
}

dmi-service/src/test/resources/application.yml

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
# ============LICENSE_START=======================================================
2-
# Copyright (C) 2021-2023 Nordix Foundation
2+
# Copyright (C) 2021-2025 OpenInfra Foundation Europe. All rights reserved.
33
# ================================================================================
44
# Licensed under the Apache License, Version 2.0 (the "License");
55
# you may not use this file except in compliance with the License.
@@ -74,6 +74,11 @@ app:
7474
ncmp:
7575
async:
7676
topic: ${NCMP_ASYNC_M2M_TOPIC:ncmp-async-m2m}
77+
dmi:
78+
ves:
79+
topicNames:
80+
- "unauthenticated.VES_PNFREG_OUTPUT"
81+
- "unauthenticated.VES_O1_NOTIFY_PNF_REGISTRATION_OUTPUT"
7782

7883
logging:
7984
format: json
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
{
2+
"event": {
3+
"commonEventHeader": {
4+
"sourceId": "ManagementElement=pynts-o-du-o1",
5+
"startEpochMicrosec": 1742470240133788,
6+
"eventId": "ManagedElement=pynts-o-du-o1_pnfRegistration",
7+
"timeZoneOffset": "+00:00",
8+
"reportingEntityId": "",
9+
"internalHeaderFields": {
10+
"collectorTimeStamp": "Thu, 03 20 2025 11:30:40 GMT"
11+
},
12+
"eventType": "PyNTS_pnfRegistration",
13+
"priority": "Low",
14+
"version": "4.1",
15+
"nfVendorName": "pynts",
16+
"reportingEntityName": "ManagementElement=pynts-o-du-o1",
17+
"sequence": 0,
18+
"domain": "pnfRegistration",
19+
"lastEpochMicrosec": 1742470240133788,
20+
"eventName": "pnfRegistration_PyNTS_pnfRegistration",
21+
"vesEventListenerVersion": "7.2.1",
22+
"sourceName": "pynts-o-du-o1",
23+
"nfNamingCode": "001"
24+
},
25+
"pnfRegistrationFields": {
26+
"unitType": "o-du-o1",
27+
"macAddress": "36:e9:09:e9:28:36",
28+
"serialNumber": "pynts-o-du-o1-172.20.0.5-pynts",
29+
"additionalFields": {
30+
"protocol": "TLS",
31+
"oamPort": "6513",
32+
"betweenAttemptsTimeout": "2000",
33+
"keepaliveDelay": "120",
34+
"sleep-factor": "1.5",
35+
"reconnectOnChangedSchema": "false",
36+
"keyId": "tls-endpoint",
37+
"connectionTimeout": "20000",
38+
"maxConnectionAttempts": "100",
39+
"tcpOnly": "false",
40+
"username": "netconf"
41+
},
42+
"pnfRegistrationFieldsVersion": "2.1",
43+
"manufactureDate": "2021-01-16",
44+
"modelNumber": "pynts",
45+
"lastServiceDate": "2021-03-26",
46+
"unitFamily": "pynts-o-du-o1",
47+
"vendorName": "pynts",
48+
"oamV4IpAddress": "172.20.0.5",
49+
"softwareVersion": "2.3.5"
50+
}
51+
}
52+
}

0 commit comments

Comments
 (0)