Skip to content

Commit c7a4e2d

Browse files
committed
improved AMQP examples
1 parent 377a3c9 commit c7a4e2d

File tree

7 files changed

+37
-34
lines changed

7 files changed

+37
-34
lines changed

examples/Demo/AMQP/ClientPool/ClientPoolAmqpExample.cs

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,21 +25,23 @@ public void Run()
2525
byte[] message = [];
2626
var usePersistence = false;
2727

28-
var scenario = Scenario.Create("amqp_scenario", async ctx =>
28+
var scenario = Scenario.Create("client_pool_scenario", async ctx =>
2929
{
30+
// get a client from the pool by Scenario InstanceID
3031
var client = clientPool.GetClient(ctx.ScenarioInfo);
3132

3233
var publish = await Step.Run("publish", ctx, async () =>
3334
{
34-
var scenarioInstanceId = ctx.ScenarioInfo.InstanceId;
35-
var prop = new BasicProperties { Persistent = usePersistence };
35+
var queueName = $"queue_{ctx.ScenarioInfo.InstanceNumber}";
36+
var props = new BasicProperties { Persistent = usePersistence };
3637

37-
var response = await client.Publish(exchange: "myExchange", routingKey: scenarioInstanceId, basicProperties: prop, body: message);
38+
var response = await client.Publish(exchange: "myExchange", routingKey: queueName, props, message);
3839
return response;
3940
});
4041

4142
var receive = await Step.Run("receive", ctx, async () =>
4243
{
44+
// pass the ScenarioCancellationToken to stop waiting for a response if the scenario finish event is triggered
4345
var response = await client.Receive(ctx.ScenarioCancellationToken);
4446
return response;
4547
});
@@ -54,19 +56,21 @@ public void Run()
5456

5557
var factory = new ConnectionFactory { HostName = config.AmqpServerUrl };
5658

59+
// initialize a client and add it to the ClientPool
5760
for (var i = 0; i < config.ClientCount; i++)
5861
{
5962
var connection = await factory.CreateConnectionAsync();
6063
var channel = await connection.CreateChannelAsync();
6164
var amqpClient = new AmqpClient(channel);
6265

63-
var scenarioInstanceId = $"amqp_scenario_{i}";
64-
var result = await amqpClient.Connect(exchange: "myExchange", exchangeType: ExchangeType.Direct, queue: scenarioInstanceId,
65-
routingKey: scenarioInstanceId, durable: usePersistence);
66+
var queueName = $"queue_{i}";
67+
68+
var result = await amqpClient.DeclareQueue(exchange: "myExchange", exchangeType: ExchangeType.Direct, queue: queueName,
69+
routingKey: queueName, durable: usePersistence);
6670

6771
if (!result.IsError)
6872
{
69-
await amqpClient.Subscribe(queue: scenarioInstanceId);
73+
await amqpClient.Subscribe(queue: queueName);
7074
clientPool.AddClient(amqpClient);
7175
}
7276
else

examples/Demo/AMQP/ClientPool/config.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,12 @@
22
"TestSuite": "AMQP",
33
"TestName": "client_pool",
44

5-
"TargetScenarios": [ "amqp_scenario" ],
5+
"TargetScenarios": [ "client_pool_scenario" ],
66

77
"GlobalSettings": {
88
"ScenariosSettings": [
99
{
10-
"ScenarioName": "amqp_scenario",
10+
"ScenarioName": "client_pool_scenario",
1111

1212
"WarmUpDuration": "00:00:05",
1313

examples/Demo/AMQP/IndependentActors/AmqpPublishScenario.cs

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,16 +21,14 @@ public ScenarioProps Create()
2121
var publish = await Step.Run("publish", ctx, async () =>
2222
{
2323
var timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
24-
var prop = new BasicProperties
24+
25+
var props = new BasicProperties
2526
{
2627
// We include the current timestamp so the consumer can calculate the final latency.
27-
Headers = new Dictionary<string, object>
28-
{
29-
{ "timestamp", timestamp }
30-
}
28+
Headers = new Dictionary<string, object> { { "timestamp", timestamp } }
3129
};
3230

33-
return await amqpClient.Publish(exchange: "myExchange", routingKey: "IndependentActors", prop, body: payload);
31+
return await amqpClient.Publish(exchange: "myExchange", routingKey: "myQueue", props, payload);
3432
});
3533

3634
return Response.Ok();
@@ -45,8 +43,8 @@ public ScenarioProps Create()
4543
var channel = await connection.CreateChannelAsync();
4644
amqpClient = new AmqpClient(channel);
4745

48-
await amqpClient.Connect(exchange: "myExchange", exchangeType: ExchangeType.Direct, queue: "IndependentActors",
49-
routingKey: "IndependentActors");
46+
await amqpClient.DeclareQueue(exchange: "myExchange", exchangeType: ExchangeType.Direct, queue: "myQueue",
47+
routingKey: "myQueue");
5048
})
5149
.WithClean(async ctx =>
5250
{

examples/Demo/AMQP/IndependentActors/AmqpConsumeScenario.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,10 @@ public ScenarioProps Create()
3535
var channel = await connection.CreateChannelAsync();
3636
amqpClient = new AmqpClient(channel);
3737

38-
await amqpClient.Connect(exchange: "myExchange", exchangeType: ExchangeType.Direct, queue: "IndependentActors",
39-
routingKey: "IndependentActors");
38+
await amqpClient.DeclareQueue(exchange: "myExchange", exchangeType: ExchangeType.Direct, queue: "myQueue",
39+
routingKey: "myQueue");
4040

41-
await amqpClient.Subscribe(queue: "IndependentActors", autoAck: true);
41+
await amqpClient.Subscribe(queue: "myQueue", autoAck: true);
4242
})
4343
.WithClean(async ctx =>
4444
{

examples/Demo/AMQP/PingPongAmqpTest.cs

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -14,39 +14,39 @@ public void Run()
1414
var payload = Data.GenerateRandomBytes(200);
1515
var factory = new ConnectionFactory { HostName = "localhost" };
1616

17-
var scenario = Scenario.Create("ping_pong_amqp_scenario", async ctx =>
17+
var scenario = Scenario.Create("ping_pong_scenario", async ctx =>
1818
{
19-
AmqpClient amqpClient = null;
20-
2119
var connect = await Step.Run("connect", ctx, async () =>
2220
{
2321
var connection = await factory.CreateConnectionAsync();
2422
var channel = await connection.CreateChannelAsync();
2523

26-
amqpClient = new AmqpClient(channel);
27-
28-
var scenarioInstanceId = ctx.ScenarioInfo.InstanceId;
29-
30-
return await amqpClient.Connect(exchange: "myExchange", exchangeType: ExchangeType.Direct, queue: scenarioInstanceId,
31-
routingKey: scenarioInstanceId);
24+
var amqpClient = new AmqpClient(channel);
25+
return Response.Ok(payload: amqpClient);
3226
});
3327

28+
using var amqpClient = connect.Payload.Value;
29+
3430
var subscribe = await Step.Run("subscribe", ctx, async () =>
3531
{
3632
var queueName = ctx.ScenarioInfo.InstanceId;
33+
34+
await amqpClient.DeclareQueue(exchange: "myExchange", exchangeType: ExchangeType.Direct, queue: queueName,
35+
routingKey: queueName);
36+
3737
return await amqpClient.Subscribe(queue: queueName, autoAck: true);
3838
});
3939

4040
var publish = await Step.Run("publish", ctx, async () =>
4141
{
4242
var queueName = ctx.ScenarioInfo.InstanceId;
43-
var prop = new BasicProperties();
44-
return await amqpClient.Publish(exchange: "myExchange", routingKey: queueName, prop, body: payload);
43+
return await amqpClient.Publish(exchange: "myExchange", routingKey: queueName, body: payload);
4544
});
4645

4746
var receive = await Step.Run("receive", ctx, async () =>
4847
{
49-
var response = await amqpClient.Receive().AsTask();
48+
// Here, we pass the ScenarioCancellationToken to stop waiting for a response if the scenario finish event is triggered
49+
var response = await amqpClient.Receive(ctx.ScenarioCancellationToken);
5050
return response;
5151
});
5252

examples/Demo/Demo.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@
8585

8686
<PackageReference Include="NBomber" Version="6.1.0-beta.1" />
8787

88-
<PackageReference Include="NBomber.AMQP" Version="0.1.3-beta.2" />
88+
<PackageReference Include="NBomber.AMQP" Version="0.2.0" />
8989
<PackageReference Include="NBomber.Data" Version="6.0.0" />
9090
<PackageReference Include="NBomber.Http" Version="6.0.2" />
9191
<PackageReference Include="NBomber.MQTT" Version="0.4.1-beta.2" />

examples/Demo/MQTT/PingPongMqttTest.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ public void Run()
4545

4646
var receive = await Step.Run("receive", ctx, async () =>
4747
{
48+
// pass the ScenarioCancellationToken to stop waiting for a response if the scenario finish event is triggered
4849
var response = await mqttClient.Receive(ctx.ScenarioCancellationToken);
4950
return response;
5051
});

0 commit comments

Comments
 (0)