1
- using Microsoft . Extensions . Configuration ;
1
+ using Microsoft . Extensions . Configuration ;
2
2
using MQTTnet ;
3
+ using MQTTnet . Protocol ;
3
4
using NBomber ;
4
5
using NBomber . CSharp ;
5
6
using NBomber . Data ;
@@ -16,10 +17,12 @@ public class CustomScenarioSettings
16
17
17
18
public class ClientPoolMqttExample
18
19
{
20
+ // For this example, please spin up local MQTT broker via docker-compose.yml located in the MQTT folder.
21
+
19
22
public void Run ( )
20
23
{
21
24
var clientPool = new ClientPool < MqttClient > ( ) ;
22
- var message = Data . GenerateRandomBytes ( 200 ) ;
25
+ byte [ ] payload = [ ] ;
23
26
24
27
var scenario = Scenario . Create ( "mqtt_scenario" , async ctx =>
25
28
{
@@ -30,30 +33,32 @@ public void Run()
30
33
var topic = $ "/clients/{ ctx . ScenarioInfo . InstanceId } ";
31
34
var msg = new MqttApplicationMessageBuilder ( )
32
35
. WithTopic ( topic )
33
- . WithPayload ( message )
36
+ . WithPayload ( payload )
37
+ . WithQualityOfServiceLevel ( MqttQualityOfServiceLevel . AtMostOnce )
34
38
. Build ( ) ;
35
39
36
40
return await mqttClient . Publish ( msg ) ;
37
41
} ) ;
38
42
39
43
var receive = await Step . Run ( "receive" , ctx , async ( ) =>
40
- await mqttClient . Receive ( ctx . ScenarioCancellationToken ) ) ;
44
+ {
45
+ var response = await mqttClient . Receive ( ctx . ScenarioCancellationToken ) ;
46
+ return response ;
47
+ } ) ;
41
48
42
49
return Response . Ok ( ) ;
43
50
} )
44
- . WithWarmUpDuration ( TimeSpan . FromSeconds ( 3 ) )
45
- . WithLoadSimulations ( Simulation . KeepConstant ( copies : 1 , during : TimeSpan . FromSeconds ( 30 ) ) )
46
51
. WithInit ( async context =>
47
52
{
48
53
var config = context . CustomSettings . Get < CustomScenarioSettings > ( ) ;
49
- message = Data . GenerateRandomBytes ( config . MsgSizeBytes ) ;
54
+ payload = Data . GenerateRandomBytes ( config . MsgSizeBytes ) ;
50
55
51
56
for ( var i = 0 ; i < config . ClientCount ; i ++ )
52
57
{
53
58
var topic = $ "/clients/mqtt_scenario_{ i } ";
54
59
var clientId = $ "mqtt_client_{ i } ";
55
60
var options = new MqttClientOptionsBuilder ( )
56
- . WithWebSocketServer ( options => { options . WithUri ( config . MqttServerUrl ) ; } )
61
+ . WithTcpServer ( config . MqttServerUrl )
57
62
. WithClientId ( clientId )
58
63
. Build ( ) ;
59
64
@@ -62,16 +67,18 @@ public void Run()
62
67
63
68
if ( ! connectResult . IsError )
64
69
{
65
- await mqttClient . Subscribe ( topic ) ;
70
+ await mqttClient . Subscribe ( topic , MqttQualityOfServiceLevel . AtMostOnce ) ;
66
71
clientPool . AddClient ( mqttClient ) ;
67
72
}
68
73
else
69
74
throw new Exception ( "client can't connect to the MQTT broker" ) ;
75
+
76
+ await Task . Delay ( 10 ) ;
70
77
}
71
78
} )
72
79
. WithClean ( ctx =>
73
80
{
74
- clientPool . DisposeClients ( async client => await client . Disconnect ( ) ) ;
81
+ clientPool . DisposeClients ( client => client . Dispose ( ) ) ;
75
82
return Task . CompletedTask ;
76
83
} ) ;
77
84
0 commit comments