@@ -18,7 +18,9 @@ package server
18
18
import (
19
19
"encoding/json"
20
20
"fmt"
21
+ "math"
21
22
"math/rand"
23
+ "strconv"
22
24
"sync"
23
25
"sync/atomic"
24
26
"testing"
@@ -991,6 +993,90 @@ func BenchmarkJetStreamPublish(b *testing.B) {
991
993
}
992
994
}
993
995
996
+ func BenchmarkJetStreamMetaSnapshot (b * testing.B ) {
997
+ c := createJetStreamClusterExplicit (b , "R3S" , 3 )
998
+ defer c .shutdown ()
999
+
1000
+ setup := func (reqLevel string ) * jetStream {
1001
+ ml := c .leader ()
1002
+ acc , js := ml .globalAccount (), ml .getJetStream ()
1003
+ n := js .getMetaGroup ()
1004
+
1005
+ // Create all streams and consumers.
1006
+ numStreams := 200
1007
+ numConsumers := 500
1008
+ ci := & ClientInfo {Cluster : "R3S" , Account : globalAccountName }
1009
+ js .mu .Lock ()
1010
+ metadata := map [string ]string {JSRequiredLevelMetadataKey : reqLevel }
1011
+ for i := 0 ; i < numStreams ; i ++ {
1012
+ scfg := & StreamConfig {
1013
+ Name : fmt .Sprintf ("STREAM-%d" , i ),
1014
+ Subjects : []string {fmt .Sprintf ("SUBJECT-%d" , i )},
1015
+ Storage : MemoryStorage ,
1016
+ Metadata : metadata ,
1017
+ }
1018
+ cfg , _ := ml .checkStreamCfg (scfg , acc , false )
1019
+ rg , _ := js .createGroupForStream (ci , & cfg )
1020
+ sa := & streamAssignment {Group : rg , Sync : syncSubjForStream (), Config : & cfg , Client : ci , Created : time .Now ().UTC ()}
1021
+ n .Propose (encodeAddStreamAssignment (sa ))
1022
+
1023
+ for j := 0 ; j < numConsumers ; j ++ {
1024
+ ccfg := & ConsumerConfig {
1025
+ Durable : fmt .Sprintf ("CONSUMER-%d" , j ),
1026
+ MemoryStorage : true ,
1027
+ Metadata : metadata ,
1028
+ }
1029
+ selectedLimits , _ , _ , _ := acc .selectLimits (ccfg .replicas (& cfg ))
1030
+ srvLim := & ml .getOpts ().JetStreamLimits
1031
+ setConsumerConfigDefaults (ccfg , & cfg , srvLim , selectedLimits , false )
1032
+ rg = js .cluster .createGroupForConsumer (ccfg , sa )
1033
+ ca := & consumerAssignment {Group : rg , Stream : cfg .Name , Name : ccfg .Durable , Config : ccfg , Client : ci , Created : time .Now ().UTC ()}
1034
+ n .Propose (encodeAddConsumerAssignment (ca ))
1035
+ }
1036
+ }
1037
+ js .mu .Unlock ()
1038
+
1039
+ // Wait for all servers to have created all assets.
1040
+ checkFor (b , 20 * time .Second , 200 * time .Millisecond , func () error {
1041
+ for _ , s := range c .servers {
1042
+ sjs := s .getJetStream ()
1043
+ sjs .mu .RLock ()
1044
+ streams := sjs .cluster .streams [globalAccountName ]
1045
+ if len (streams ) != numStreams {
1046
+ sjs .mu .RUnlock ()
1047
+ return fmt .Errorf ("expected %d streams, got %d" , numStreams , len (streams ))
1048
+ }
1049
+ for _ , sa := range streams {
1050
+ if nc := len (sa .consumers ); nc != numConsumers {
1051
+ sjs .mu .RUnlock ()
1052
+ return fmt .Errorf ("expected %d consumers, got %d" , numConsumers , nc )
1053
+ }
1054
+ }
1055
+ sjs .mu .RUnlock ()
1056
+ }
1057
+ return nil
1058
+ })
1059
+ return js
1060
+ }
1061
+
1062
+ for _ , t := range []struct {
1063
+ title string
1064
+ reqLevel string
1065
+ }{
1066
+ {title : "Default" , reqLevel : "0" },
1067
+ {title : "AllUnsupported" , reqLevel : strconv .Itoa (math .MaxInt )},
1068
+ } {
1069
+ b .Run (t .title , func (b * testing.B ) {
1070
+ js := setup (t .reqLevel )
1071
+ b .ResetTimer ()
1072
+ for range b .N {
1073
+ js .metaSnapshot ()
1074
+ }
1075
+ b .StopTimer ()
1076
+ })
1077
+ }
1078
+ }
1079
+
994
1080
func BenchmarkJetStreamCounters (b * testing.B ) {
995
1081
const (
996
1082
verbose = false
0 commit comments