Skip to content

Commit 6bceb5c

Browse files
committed
Add connectivity label to list_stream_group_consumers
1 parent dfcbfbe commit 6bceb5c

File tree

2 files changed

+48
-54
lines changed

2 files changed

+48
-54
lines changed

deps/rabbit/src/rabbit_stream_sac_coordinator.erl

Lines changed: 28 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,7 @@ wrap_cmd(Cmd) ->
152152
{sac, Cmd}.
153153

154154
%% return the current groups for a given virtual host
155+
%% (CLI command)
155156
-spec consumer_groups(binary(), [atom()]) ->
156157
{ok,
157158
[term()]} | {error, sac_error() | term()}.
@@ -173,6 +174,7 @@ consumer_groups(VirtualHost, InfoKeys) ->
173174
end.
174175

175176
%% get the consumers of a given group in a given virtual host
177+
%% (CLI command)
176178
-spec group_consumers(binary(), binary(), binary(), [atom()]) ->
177179
{ok, [term()]} |
178180
{error, sac_error() | term()}.
@@ -544,6 +546,7 @@ maybe_rebalance_group(#group{partition_index = _, consumers = Consumers} = G,
544546
end
545547
end.
546548

549+
%% used by CLI
547550
-spec consumer_groups(binary(), [atom()], state()) -> {ok, [term()]}.
548551
consumer_groups(VirtualHost, InfoKeys, #?MODULE{groups = Groups} = S)
549552
when ?IS_STATE_REC(S) ->
@@ -586,38 +589,14 @@ consumer_groups(VirtualHost, InfoKeys, S) ->
586589
[atom()],
587590
state()) ->
588591
{ok, [term()]} | {error, not_found}.
589-
group_consumers(VirtualHost,
590-
Stream,
591-
Reference,
592-
InfoKeys,
592+
group_consumers(VH, St, Ref, InfoKeys,
593593
#?MODULE{groups = Groups} = S)
594594
when ?IS_STATE_REC(S) ->
595-
GroupId = {VirtualHost, Stream, Reference},
595+
GroupId = {VH, St, Ref},
596596
case Groups of
597597
#{GroupId := #group{consumers = Consumers}} ->
598-
Cs = lists:foldr(fun(#consumer{subscription_id = SubId,
599-
owner = Owner,
600-
status = Status},
601-
Acc) ->
602-
Record =
603-
lists:foldr(fun (subscription_id, RecAcc) ->
604-
[{subscription_id,
605-
SubId}
606-
| RecAcc];
607-
(connection_name, RecAcc) ->
608-
[{connection_name,
609-
Owner}
610-
| RecAcc];
611-
(state, RecAcc) ->
612-
[{state, cli_consumer_status_label(Status)}
613-
| RecAcc];
614-
(Unknown, RecAcc) ->
615-
[{Unknown,
616-
unknown_field}
617-
| RecAcc]
618-
end,
619-
[], InfoKeys),
620-
[Record | Acc]
598+
Cs = lists:foldr(fun(C, Acc) ->
599+
[csr_cli_record(C, InfoKeys) | Acc]
621600
end,
622601
[], Consumers),
623602
{ok, Cs};
@@ -628,12 +607,27 @@ group_consumers(VirtualHost, Stream, Reference, InfoKeys, S) ->
628607
rabbit_stream_sac_coordinator_v4:group_consumers(VirtualHost, Stream,
629608
Reference, InfoKeys, S).
630609

631-
cli_consumer_status_label({?PDOWN, _}) ->
632-
inactive;
633-
cli_consumer_status_label({_, ?ACTIVE}) ->
634-
active;
635-
cli_consumer_status_label(_) ->
636-
inactive.
610+
csr_cli_record(#consumer{subscription_id = SubId, owner = Owner,
611+
status = Status}, InfoKeys) ->
612+
lists:foldr(fun (subscription_id, Acc) ->
613+
[{subscription_id, SubId} | Acc];
614+
(connection_name, Acc) ->
615+
[{connection_name, Owner} | Acc];
616+
(state, Acc) ->
617+
[{state, cli_csr_status_label(Status)} | Acc];
618+
(Unknown, Acc) ->
619+
[{Unknown, unknown_field} | Acc]
620+
end,
621+
[], InfoKeys).
622+
623+
624+
cli_csr_status_label({Cnty, Acty}) ->
625+
rabbit_data_coercion:to_list(Acty) ++ " (" ++ connectivity_label(Cnty) ++ ")".
626+
627+
connectivity_label(?PDOWN) ->
628+
"presumed down";
629+
connectivity_label(Cnty) ->
630+
rabbit_data_coercion:to_list(Cnty).
637631

638632
-spec ensure_monitors(command(),
639633
state(),

deps/rabbitmq_stream/test/commands_SUITE.erl

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -378,7 +378,7 @@ list_consumer_groups_run(Config) ->
378378
{ok, []} = ?COMMAND_LIST_CONSUMER_GROUPS:run([], Opts),
379379

380380
StreamPort = rabbit_stream_SUITE:get_stream_port(Config),
381-
{S, C} = start_stream_connection(StreamPort),
381+
{S, C0} = start_stream_connection(StreamPort),
382382
?awaitMatch(1, connection_count(Config), ?WAIT),
383383

384384
ConsumerReference = <<"foo">>,
@@ -387,34 +387,34 @@ list_consumer_groups_run(Config) ->
387387
<<"name">> => ConsumerReference},
388388

389389
Stream1 = <<"list_consumer_groups_run_1">>,
390-
create_stream(S, Stream1, C),
391-
subscribe(S, 0, Stream1, SubProperties, C),
392-
handle_consumer_update(S, C, 0),
393-
subscribe(S, 1, Stream1, SubProperties, C),
394-
subscribe(S, 2, Stream1, SubProperties, C),
390+
C1 = create_stream(S, Stream1, C0),
391+
C2 = subscribe(S, 0, Stream1, SubProperties, C1),
392+
C3 = handle_consumer_update(S, C2, 0),
393+
C4 = subscribe(S, 1, Stream1, SubProperties, C3),
394+
C5 = subscribe(S, 2, Stream1, SubProperties, C4),
395395

396396
?awaitMatch(3, consumer_count(Config), ?WAIT),
397397

398398
{ok, [CG1]} = ?COMMAND_LIST_CONSUMER_GROUPS:run([], Opts),
399399
assertConsumerGroup(Stream1, ConsumerReference, -1, 3, CG1),
400400

401401
Stream2 = <<"list_consumer_groups_run_2">>,
402-
create_stream(S, Stream2, C),
403-
subscribe(S, 3, Stream2, SubProperties, C),
404-
handle_consumer_update(S, C, 3),
405-
subscribe(S, 4, Stream2, SubProperties, C),
406-
subscribe(S, 5, Stream2, SubProperties, C),
402+
C6 = create_stream(S, Stream2, C5),
403+
C7 = subscribe(S, 3, Stream2, SubProperties, C6),
404+
C8 = handle_consumer_update(S, C7, 3),
405+
C9 = subscribe(S, 4, Stream2, SubProperties, C8),
406+
C10 = subscribe(S, 5, Stream2, SubProperties, C9),
407407

408408
?awaitMatch(3 + 3, consumer_count(Config), ?WAIT),
409409

410410
{ok, [CG1, CG2]} = ?COMMAND_LIST_CONSUMER_GROUPS:run([], Opts),
411411
assertConsumerGroup(Stream1, ConsumerReference, -1, 3, CG1),
412412
assertConsumerGroup(Stream2, ConsumerReference, -1, 3, CG2),
413413

414-
delete_stream(S, Stream1, C),
415-
delete_stream(S, Stream2, C),
414+
C11 = delete_stream(S, Stream1, C10),
415+
C12 = delete_stream(S, Stream2, C11),
416416

417-
close(S, C),
417+
close(S, C12),
418418
{ok, []} = ?COMMAND_LIST_CONSUMER_GROUPS:run([], Opts),
419419
ok.
420420

@@ -490,9 +490,9 @@ list_group_consumers_run(Config) ->
490490

491491
{ok, Consumers1} =
492492
?COMMAND_LIST_GROUP_CONSUMERS:run(Args, OptsGroup1),
493-
?assertEqual([[{subscription_id, 0}, {state, active}],
494-
[{subscription_id, 1}, {state, inactive}],
495-
[{subscription_id, 2}, {state, inactive}]],
493+
?assertEqual([[{subscription_id, 0}, {state, "active (connected)"}],
494+
[{subscription_id, 1}, {state, "waiting (connected)"}],
495+
[{subscription_id, 2}, {state, "waiting (connected)"}]],
496496
Consumers1),
497497

498498
Stream2 = <<"list_group_consumers_run_2">>,
@@ -510,9 +510,9 @@ list_group_consumers_run(Config) ->
510510

511511
{ok, Consumers2} =
512512
?COMMAND_LIST_GROUP_CONSUMERS:run(Args, OptsGroup2),
513-
?assertEqual([[{subscription_id, 3}, {state, active}],
514-
[{subscription_id, 4}, {state, inactive}],
515-
[{subscription_id, 5}, {state, inactive}]],
513+
?assertEqual([[{subscription_id, 3}, {state, "active (connected)"}],
514+
[{subscription_id, 4}, {state, "waiting (connected)"}],
515+
[{subscription_id, 5}, {state, "waiting (connected)"}]],
516516
Consumers2),
517517

518518
delete_stream(S, Stream1, C),

0 commit comments

Comments
 (0)