Skip to content

Cleanly dispose of RabbitMQ connections #4911

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
164 changes: 88 additions & 76 deletions utils/build/docker/dotnet/weblog/Endpoints/DsmEndpoint.cs
Original file line number Diff line number Diff line change
Expand Up @@ -104,120 +104,132 @@ public static void DoWork(string queue, string group) {
class RabbitMQProducer {
public static void DoWork(string queue, string exchange, string routing_key)
{
var helper = new RabbitMQHelper();
helper.ExchangeDeclare(exchange, ExchangeType.Direct);
helper.CreateQueue(queue);
helper.QueueBind(queue, exchange, routing_key);
using (var helper = new RabbitMQHelper())
{
helper.ExchangeDeclare(exchange, ExchangeType.Direct);
helper.CreateQueue(queue);
helper.QueueBind(queue, exchange, routing_key);

helper.ExchangePublish(exchange, routing_key, "hello world");
Console.WriteLine("[rabbitmq] Produced message");
helper.ExchangePublish(exchange, routing_key, "hello world");
Console.WriteLine("[rabbitmq] Produced message");
}
}
}

class RabbitMQConsumer {
public static void DoWork(string queue, string exchange, string routing_key)
{
var helper = new RabbitMQHelper();
helper.ExchangeDeclare(exchange, ExchangeType.Direct);
helper.CreateQueue(queue);
helper.QueueBind(queue, exchange, routing_key);

helper.AddListener(queue, message =>
using (var helper = new RabbitMQHelper())
{
Console.WriteLine("[rabbitmq] Consumed message");
});
helper.ExchangeDeclare(exchange, ExchangeType.Direct);
helper.CreateQueue(queue);
helper.QueueBind(queue, exchange, routing_key);

helper.AddListener(queue, message =>
{
Console.WriteLine("[rabbitmq] Consumed message");
});
}
}
}

class RabbitMQProducerFanoutExchange {
public static void DoWork()
{
var helper = new RabbitMQHelper();
helper.ExchangeDeclare("systemTestFanoutExchange", ExchangeType.Fanout);
helper.CreateQueue("systemTestRabbitmqFanoutQueue1");
helper.CreateQueue("systemTestRabbitmqFanoutQueue2");
helper.CreateQueue("systemTestRabbitmqFanoutQueue3");
helper.QueueBind("systemTestRabbitmqFanoutQueue1", "systemTestFanoutExchange", "");
helper.QueueBind("systemTestRabbitmqFanoutQueue2", "systemTestFanoutExchange", "");
helper.QueueBind("systemTestRabbitmqFanoutQueue3", "systemTestFanoutExchange", "");
using (var helper = new RabbitMQHelper())
{
helper.ExchangeDeclare("systemTestFanoutExchange", ExchangeType.Fanout);
helper.CreateQueue("systemTestRabbitmqFanoutQueue1");
helper.CreateQueue("systemTestRabbitmqFanoutQueue2");
helper.CreateQueue("systemTestRabbitmqFanoutQueue3");
helper.QueueBind("systemTestRabbitmqFanoutQueue1", "systemTestFanoutExchange", "");
helper.QueueBind("systemTestRabbitmqFanoutQueue2", "systemTestFanoutExchange", "");
helper.QueueBind("systemTestRabbitmqFanoutQueue3", "systemTestFanoutExchange", "");

helper.ExchangePublish("systemTestFanoutExchange", "", "hello world, fanout exchange!");
Console.WriteLine("[rabbitmq_fanout] Produced message");
helper.ExchangePublish("systemTestFanoutExchange", "", "hello world, fanout exchange!");
Console.WriteLine("[rabbitmq_fanout] Produced message");
}
}
}

class RabbitMQConsumerFanoutExchange {
public static void DoWork()
{
var helper = new RabbitMQHelper();
helper.ExchangeDeclare("systemTestFanoutExchange", ExchangeType.Fanout);
helper.CreateQueue("systemTestRabbitmqFanoutQueue1");
helper.CreateQueue("systemTestRabbitmqFanoutQueue2");
helper.CreateQueue("systemTestRabbitmqFanoutQueue3");
helper.QueueBind("systemTestRabbitmqFanoutQueue1", "systemTestFanoutExchange", "");
helper.QueueBind("systemTestRabbitmqFanoutQueue2", "systemTestFanoutExchange", "");
helper.QueueBind("systemTestRabbitmqFanoutQueue3", "systemTestFanoutExchange", "");

helper.AddListener("systemTestRabbitmqFanoutQueue1", message =>
{
Console.WriteLine("[rabbitmq_fanout] Consumed message: " + message);
});
helper.AddListener("systemTestRabbitmqFanoutQueue2", message =>
{
Console.WriteLine("[rabbitmq_fanout] Consumed message: " + message);
});
helper.AddListener("systemTestRabbitmqFanoutQueue3", message =>
using (var helper = new RabbitMQHelper())
{
Console.WriteLine("[rabbitmq_fanout] Consumed message: " + message);
});
helper.ExchangeDeclare("systemTestFanoutExchange", ExchangeType.Fanout);
helper.CreateQueue("systemTestRabbitmqFanoutQueue1");
helper.CreateQueue("systemTestRabbitmqFanoutQueue2");
helper.CreateQueue("systemTestRabbitmqFanoutQueue3");
helper.QueueBind("systemTestRabbitmqFanoutQueue1", "systemTestFanoutExchange", "");
helper.QueueBind("systemTestRabbitmqFanoutQueue2", "systemTestFanoutExchange", "");
helper.QueueBind("systemTestRabbitmqFanoutQueue3", "systemTestFanoutExchange", "");

helper.AddListener("systemTestRabbitmqFanoutQueue1", message =>
{
Console.WriteLine("[rabbitmq_fanout] Consumed message: " + message);
});
helper.AddListener("systemTestRabbitmqFanoutQueue2", message =>
{
Console.WriteLine("[rabbitmq_fanout] Consumed message: " + message);
});
helper.AddListener("systemTestRabbitmqFanoutQueue3", message =>
{
Console.WriteLine("[rabbitmq_fanout] Consumed message: " + message);
});
}
}
}

class RabbitMQProducerTopicExchange
{
public static void DoWork()
{
var helper = new RabbitMQHelper();
helper.ExchangeDeclare("systemTestTopicExchange", ExchangeType.Topic);
helper.CreateQueue("systemTestRabbitmqTopicQueue1");
helper.CreateQueue("systemTestRabbitmqTopicQueue2");
helper.CreateQueue("systemTestRabbitmqTopicQueue3");
helper.QueueBind("systemTestRabbitmqTopicQueue1", "systemTestTopicExchange", "test.topic.*.cake");
helper.QueueBind("systemTestRabbitmqTopicQueue2", "systemTestTopicExchange", "test.topic.vanilla.*");
helper.QueueBind("systemTestRabbitmqTopicQueue3", "systemTestTopicExchange", "test.topic.chocolate.*");
using (var helper = new RabbitMQHelper())
{
helper.ExchangeDeclare("systemTestTopicExchange", ExchangeType.Topic);
helper.CreateQueue("systemTestRabbitmqTopicQueue1");
helper.CreateQueue("systemTestRabbitmqTopicQueue2");
helper.CreateQueue("systemTestRabbitmqTopicQueue3");
helper.QueueBind("systemTestRabbitmqTopicQueue1", "systemTestTopicExchange", "test.topic.*.cake");
helper.QueueBind("systemTestRabbitmqTopicQueue2", "systemTestTopicExchange", "test.topic.vanilla.*");
helper.QueueBind("systemTestRabbitmqTopicQueue3", "systemTestTopicExchange", "test.topic.chocolate.*");

helper.ExchangePublish("systemTestTopicExchange", "test.topic.chocolate.cake", "hello world");
helper.ExchangePublish("systemTestTopicExchange", "test.topic.chocolate.icecream", "hello world");
helper.ExchangePublish("systemTestTopicExchange", "test.topic.vanilla.icecream", "hello world");
Console.WriteLine("[rabbitmq_topic] Produced messages");
helper.ExchangePublish("systemTestTopicExchange", "test.topic.chocolate.cake", "hello world");
helper.ExchangePublish("systemTestTopicExchange", "test.topic.chocolate.icecream", "hello world");
helper.ExchangePublish("systemTestTopicExchange", "test.topic.vanilla.icecream", "hello world");
Console.WriteLine("[rabbitmq_topic] Produced messages");
}
}
}

class RabbitMQConsumerTopicExchange
{
public static void DoWork()
{
var helper = new RabbitMQHelper();
helper.ExchangeDeclare("systemTestTopicExchange", ExchangeType.Topic);
helper.CreateQueue("systemTestRabbitmqTopicQueue1");
helper.CreateQueue("systemTestRabbitmqTopicQueue2");
helper.CreateQueue("systemTestRabbitmqTopicQueue3");
helper.QueueBind("systemTestRabbitmqTopicQueue1", "systemTestTopicExchange", "test.topic.*.cake");
helper.QueueBind("systemTestRabbitmqTopicQueue2", "systemTestTopicExchange", "test.topic.vanilla.*");
helper.QueueBind("systemTestRabbitmqTopicQueue3", "systemTestTopicExchange", "test.topic.chocolate.*");

helper.AddListener("systemTestRabbitmqTopicQueue1", message =>
{
Console.WriteLine("[rabbitmq_topic] Consumed message from queue1: " + message);
});
helper.AddListener("systemTestRabbitmqTopicQueue2", message =>
using (var helper = new RabbitMQHelper())
{
Console.WriteLine("[rabbitmq_topic] Consumed message from queue2: " + message);
});
helper.AddListener("systemTestRabbitmqTopicQueue3", message =>
{
Console.WriteLine("[rabbitmq_topic] Consumed message from queue3: " + message);
});
helper.ExchangeDeclare("systemTestTopicExchange", ExchangeType.Topic);
helper.CreateQueue("systemTestRabbitmqTopicQueue1");
helper.CreateQueue("systemTestRabbitmqTopicQueue2");
helper.CreateQueue("systemTestRabbitmqTopicQueue3");
helper.QueueBind("systemTestRabbitmqTopicQueue1", "systemTestTopicExchange", "test.topic.*.cake");
helper.QueueBind("systemTestRabbitmqTopicQueue2", "systemTestTopicExchange", "test.topic.vanilla.*");
helper.QueueBind("systemTestRabbitmqTopicQueue3", "systemTestTopicExchange", "test.topic.chocolate.*");

helper.AddListener("systemTestRabbitmqTopicQueue1", message =>
{
Console.WriteLine("[rabbitmq_topic] Consumed message from queue1: " + message);
});
helper.AddListener("systemTestRabbitmqTopicQueue2", message =>
{
Console.WriteLine("[rabbitmq_topic] Consumed message from queue2: " + message);
});
helper.AddListener("systemTestRabbitmqTopicQueue3", message =>
{
Console.WriteLine("[rabbitmq_topic] Consumed message from queue3: " + message);
});
}
}
}

Expand Down
Loading