Skip to content

Selenium: Verify consumer state when using SAC (backport #13950) #13968

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
May 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 20 additions & 3 deletions deps/rabbit/src/rabbit_fifo.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1529,9 +1529,12 @@ activate_next_consumer(#?STATE{consumers = Cons0,
State = State0#?STATE{consumers = Cons,
service_queue = ServiceQueue1,
waiting_consumers = Waiting},
Effects1 = consumer_update_active_effects(State, Active,
false, waiting,
Effects0),
Effects = consumer_update_active_effects(State, Consumer,
true, single_active,
Effects0),
Effects1),
{State, Effects};
{{ActiveCKey, ?CONSUMER_PRIORITY(ActivePriority) = Active},
{_NextCKey, ?CONSUMER_PRIORITY(WaitingPriority)}}
Expand Down Expand Up @@ -1829,8 +1832,22 @@ complete_and_checkout(#{} = Meta, MsgIds, ConsumerKey,
Effects0, State0) ->
State1 = complete(Meta, ConsumerKey, MsgIds, Con0, State0),
%% a completion could have removed the active/quiescing consumer
{State2, Effects1} = activate_next_consumer(State1, Effects0),
checkout(Meta, State0, State2, Effects1).
Effects1 = add_active_effect(Con0, State1, Effects0),
{State2, Effects2} = activate_next_consumer(State1, Effects1),
checkout(Meta, State0, State2, Effects2).

add_active_effect(#consumer{status = quiescing} = Consumer,
#?STATE{cfg = #cfg{consumer_strategy = single_active},
consumers = Consumers} = State,
Effects) ->
case active_consumer(Consumers) of
undefined ->
consumer_update_active_effects(State, Consumer, false, waiting, Effects);
_ ->
Effects
end;
add_active_effect(_, _, Effects) ->
Effects.

cancel_consumer_effects(ConsumerId,
#?STATE{cfg = #cfg{resource = QName}},
Expand Down
2 changes: 1 addition & 1 deletion deps/rabbitmq_management/priv/www/js/tmpl/consumers.ejs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
<% if (consumers.length > 0) { %>
<table class="list">
<table class="list" id="consumers">
<thead>
<tr>
<% if (mode == 'queue') { %>
Expand Down
12 changes: 6 additions & 6 deletions deps/rabbitmq_management/priv/www/js/tmpl/queue.ejs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@
<% } %>

<h3>Details</h3>
<table class="facts facts-l">
<table class="facts facts-l" id="details-queue-table">
<tr>
<th>Features</th>
<td><%= fmt_features(queue) %></td>
<td id="details-queue-features"><%= fmt_features(queue) %></td>
</tr>
<% if(!disable_stats) { %>
<tr>
Expand Down Expand Up @@ -76,20 +76,20 @@
</table>

<% if(!disable_stats) { %>
<table class="facts facts-l">
<table class="facts facts-l" id="details-queue-stats-table">
<tr>
<th>State</th>
<td><%= fmt_object_state(queue) %></td>
</tr>
<% if(queue.consumers) { %>
<tr>
<th>Consumers</th>
<td><%= fmt_string(queue.consumers) %></td>
<td id="consumers"><%= fmt_string(queue.consumers) %></td>
</tr>
<% } else if(queue.hasOwnProperty('consumer_details')) { %>
<tr>
<th>Consumers</th>
<td><%= fmt_string(queue.consumer_details.length) %></td>
<td id="consumers"><%= fmt_string(queue.consumer_details.length) %></td>
</tr>
<% } %>
<% if (is_classic(queue)) { %>
Expand Down Expand Up @@ -277,7 +277,7 @@
<% } %>

<% if(!disable_stats) { %>
<div class="section-hidden">
<div class="section-hidden" id="queue-consumers-section">
<h2 class="updatable">Consumers (<%=(queue.consumer_details.length)%>) </h2>
<div class="hider updatable">
<%= format('consumers', {'mode': 'queue', 'consumers': queue.consumer_details}) %>
Expand Down
108 changes: 108 additions & 0 deletions deps/rabbitmq_management/priv/www/js/tmpl/quorum-queue-stats.ejs
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
<table class="facts facts-l" id="details-queue-stats-table">
<tbody>
<tr>
<th>State</th>
<td><%= fmt_object_state(queue) %></td>
</tr>
<% if(queue.consumers) { %>
<tr>
<th>Consumers</th>
<td id="consumers"><%= fmt_string(queue.consumers) %></td>
</tr>
<% } else if(queue.hasOwnProperty('consumer_details')) { %>
<tr>
<th>Consumers</th>
<td id="consumers"><%= fmt_string(queue.consumer_details.length) %></td>
</tr>
<% } %>
<% if(queue.hasOwnProperty('publishers')) { %>
<tr>
<th>Publishers</th>
<td id="publishers"><%= fmt_string(queue.publishers) %></td>
</tr>
<% } %>
<tr>
<th>Open files</th>
<td><%= fmt_table_short(queue.open_files) %></td>
</tr>
<% if (queue.hasOwnProperty('delivery_limit')) { %>
<tr>
<th>Delivery limit <span class="help" id="queue-delivery-limit"></th>
<td><%= fmt_string(queue.delivery_limit) %></td>
</tr>
<% } %>
</tbody>
</table>

<table class="facts">
<tr>
<td></td>
<th class="horizontal">Total</th>
<th class="horizontal">Ready</th>
<th class="horizontal">Unacked</th>
<th class="horizontal">High priority</th>
<th class="horizontal">Normal priority</th>
<th class="horizontal">Returned</th>
<th class="horizontal">Dead-lettered
<span class="help" id="queue-dead-lettered"></span>
</th>
</tr>
<tr>
<th>
Messages
<span class="help" id="queue-messages"></span>
</th>
<td class="r">
<%= fmt_num_thousands(queue.messages) %>
</td>
<td class="r">
<%= fmt_num_thousands(queue.messages_ready) %>
</td>
<td class="r">
<%= fmt_num_thousands(queue.messages_unacknowledged) %>
</td>
<td class="r">
<%= fmt_num_thousands(queue.messages_ready_high) %>
</td>
<td class="r">
<%= fmt_num_thousands(queue.messages_ready_normal) %>
</td>
<td class="r">
<%= fmt_num_thousands(queue.messages_ready_returned) %>
</td>
<td class="r">
<%= fmt_num_thousands(queue.messages_dlx) %>
</td>
</tr>
<tr>
<th>
Message body bytes
<span class="help" id="queue-message-body-bytes"></span>
</th>
<td class="r">
<%= fmt_bytes(queue.message_bytes) %>
</td>
<td class="r">
<%= fmt_bytes(queue.message_bytes_ready) %>
</td>
<td class="r">
<%= fmt_bytes(queue.message_bytes_unacknowledged) %>
</td>
<td class="r">
</td>
<td class="r">
</td>
<td class="r">
</td>
<td class="r">
<%= fmt_bytes(queue.message_bytes_dlx) %>
</td>
</tr>
<tr>
<th>
Process memory
<span class="help" id="queue-process-memory"></span>
</th>
<td class="r"><%= fmt_bytes(queue.memory) %></td>
</tr>
</table>
3 changes: 1 addition & 2 deletions selenium/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
# syntax=docker/dockerfile:1
FROM atools/jdk-maven-node:mvn3-jdk11-node16 as base
FROM node:18 as base

WORKDIR /code

Expand Down
3 changes: 2 additions & 1 deletion selenium/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@
"proxy": "^1.0.2",
"rhea": "^3.0.3",
"selenium-webdriver": "^4.26.0",
"xmlhttprequest": "^1.8.0"
"xmlhttprequest": "^1.8.0",
"amqplib": "0.8.0"
},
"devDependencies": {
"chai": "^4.3.6",
Expand Down
23 changes: 19 additions & 4 deletions selenium/test/amqp.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ var connectionOptions = getConnectionOptions()

function getAmqpConnectionOptions() {
return {
'scheme': process.env.RABBITMQ_AMQP_SCHEME || 'amqp',
'host': process.env.RABBITMQ_HOSTNAME || 'rabbitmq',
'port': process.env.RABBITMQ_AMQP_PORT || 5672,
'username' : process.env.RABBITMQ_AMQP_USERNAME || 'guest',
Expand Down Expand Up @@ -39,21 +40,28 @@ function getConnectionOptions() {
}
}
module.exports = {

open: () => {
getAmqpConnectionOptions: () => { return connectionOptions },
getAmqpUrl: () => {
return connectionOptions.scheme + '://' +
connectionOptions.username + ":" + connectionOptions.password + "@" +
connectionOptions.host + ":" + connectionOptions.port
},
open: (queueName = "my-queue") => {
let promise = new Promise((resolve, reject) => {
container.on('connection_open', function(context) {
resolve()
})
})
console.log("Opening amqp connection using " + JSON.stringify(connectionOptions))

let connection = container.connect(connectionOptions)
let receiver = connection.open_receiver({
source: 'my-queue',
source: queueName,
target: 'receiver-target',
name: 'receiver-link'
})
let sender = connection.open_sender({
target: 'my-queue',
target: queueName,
source: 'sender-source',
name: 'sender-link'
})
Expand All @@ -64,6 +72,13 @@ module.exports = {
'sender' : sender
}
},
openReceiver: (handler, queueName = "my-queue") => {
return handler.connection.open_receiver({
source: queueName,
target: 'receiver-target',
name: 'receiver-link'
})
},
close: (connection) => {
if (connection != null) {
connection.close()
Expand Down
3 changes: 2 additions & 1 deletion selenium/test/exchanges/management.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ describe('Exchange management', function () {
if (!await overview.isLoaded()) {
throw new Error('Failed to login')
}
overview.clickOnExchangesTab()
await overview.selectRefreshOption("Do not refresh")
await overview.clickOnExchangesTab()
})

it('display summary of exchanges', async function () {
Expand Down
24 changes: 9 additions & 15 deletions selenium/test/mgt-api.js
Original file line number Diff line number Diff line change
Expand Up @@ -114,18 +114,17 @@ module.exports = {
throw new Error(req.responseText)
}
},
createVhost: (url, name, description = "", tags = []) => {
createVhost: (url, authorization, name, description = "", tags = []) => {
let vhost = {
"description": description,
"tags": tags
}
log("Create vhost " + JSON.stringify(vhost)
+ " with name " + name + " on " + url)
const req = new XMLHttpRequest()
let base64Credentials = btoa('administrator-only' + ":" + 'guest')
let finalUrl = url + "/api/vhosts/" + encodeURIComponent(name)
req.open('PUT', finalUrl, false)
req.setRequestHeader("Authorization", "Basic " + base64Credentials)
req.setRequestHeader("Authorization", authorization)
req.setRequestHeader('Content-Type', 'application/json')

req.send(JSON.stringify(vhost))
Expand Down Expand Up @@ -158,13 +157,12 @@ module.exports = {
throw new Error(req.responseText)
}
},
deleteVhost: (url, vhost) => {
deleteVhost: (url, authorization, vhost) => {
log("Deleting vhost " + vhost)
const req = new XMLHttpRequest()
let base64Credentials = btoa('administrator-only' + ":" + 'guest')
let finalUrl = url + "/api/vhosts/" + encodeURIComponent(vhost)
req.open('DELETE', finalUrl, false)
req.setRequestHeader("Authorization", "Basic " + base64Credentials)
req.setRequestHeader("Authorization", authorization)

req.send()
if (req.status == 200 || req.status == 204) {
Expand Down Expand Up @@ -194,21 +192,18 @@ module.exports = {
throw new Error(req.responseText)
}
},
createQueue: (url, name, vhost, queueType = "quorum") => {
createQueue: (url, authorization, vhost, name, arguments = {}) => {
log("Create queue " + JSON.stringify(name)
+ " in vhost " + vhost + " on " + url)
const req = new XMLHttpRequest()
let base64Credentials = btoa('administrator-only' + ":" + 'guest')
let finalUrl = url + "/api/queues/" + encodeURIComponent(vhost) + "/"
+ encodeURIComponent(name)
req.open('PUT', finalUrl, false)
req.setRequestHeader("Authorization", "Basic " + base64Credentials)
req.setRequestHeader("Authorization", authorization)
req.setRequestHeader('Content-Type', 'application/json')
let payload = {
"durable": true,
"arguments":{
"x-queue-type" : queueType
}
"arguments": arguments
}
req.send(JSON.stringify(payload))
if (req.status == 200 || req.status == 204 || req.status == 201) {
Expand All @@ -219,14 +214,13 @@ module.exports = {
throw new Error(req.responseText)
}
},
deleteQueue: (url, name, vhost) => {
deleteQueue: (url, authorization, vhost, name) => {
log("Deleting queue " + name + " on vhost " + vhost)
const req = new XMLHttpRequest()
let base64Credentials = btoa('administrator-only' + ":" + 'guest')
let finalUrl = url + "/api/queues/" + encodeURIComponent(vhost) + "/"
+ encodeURIComponent(name)
req.open('DELETE', finalUrl, false)
req.setRequestHeader("Authorization", "Basic " + base64Credentials)
req.setRequestHeader("Authorization", authorization)

req.send()
if (req.status == 200 || req.status == 204) {
Expand Down
Loading
Loading