@@ -30,16 +30,16 @@ import (
3030 ulog "github.com/elastic/cloud-on-k8s/v2/pkg/utils/log"
3131)
3232
33- // Aggregator aggregates the total of resources of all Elastic managed components
34- type Aggregator struct {
33+ // aggregator aggregates the total of resources of all Elastic managed components
34+ type aggregator struct {
3535 client k8s.Client
3636}
3737
38- type aggregate func (ctx context.Context ) (resource. Quantity , error )
38+ type aggregate func (ctx context.Context ) (managedMemory , error )
3939
40- // AggregateMemory aggregates the total memory of all Elastic managed components
41- func (a Aggregator ) AggregateMemory (ctx context.Context ) (resource. Quantity , error ) {
42- var totalMemory resource. Quantity
40+ // aggregateMemory aggregates the total memory of all Elastic managed components
41+ func (a aggregator ) aggregateMemory (ctx context.Context ) (memoryUsage , error ) {
42+ usage := newMemoryUsage ()
4343
4444 for _ , f := range []aggregate {
4545 a .aggregateElasticsearchMemory ,
@@ -50,19 +50,19 @@ func (a Aggregator) AggregateMemory(ctx context.Context) (resource.Quantity, err
5050 } {
5151 memory , err := f (ctx )
5252 if err != nil {
53- return resource. Quantity {}, err
53+ return memoryUsage {}, err
5454 }
55- totalMemory . Add (memory )
55+ usage . add (memory )
5656 }
5757
58- return totalMemory , nil
58+ return usage , nil
5959}
6060
61- func (a Aggregator ) aggregateElasticsearchMemory (ctx context.Context ) (resource. Quantity , error ) {
61+ func (a aggregator ) aggregateElasticsearchMemory (ctx context.Context ) (managedMemory , error ) {
6262 var esList esv1.ElasticsearchList
6363 err := a .client .List (context .Background (), & esList )
6464 if err != nil {
65- return resource. Quantity {}, errors .Wrap (err , "failed to aggregate Elasticsearch memory" )
65+ return managedMemory {}, errors .Wrap (err , "failed to aggregate Elasticsearch memory" )
6666 }
6767
6868 var total resource.Quantity
@@ -75,7 +75,7 @@ func (a Aggregator) aggregateElasticsearchMemory(ctx context.Context) (resource.
7575 nodespec .DefaultMemoryLimits ,
7676 )
7777 if err != nil {
78- return resource. Quantity {}, errors .Wrap (err , "failed to aggregate Elasticsearch memory" )
78+ return managedMemory {}, errors .Wrap (err , "failed to aggregate Elasticsearch memory" )
7979 }
8080
8181 total .Add (multiply (mem , nodeSet .Count ))
@@ -84,14 +84,14 @@ func (a Aggregator) aggregateElasticsearchMemory(ctx context.Context) (resource.
8484 }
8585 }
8686
87- return total , nil
87+ return managedMemory { total , elasticsearchKey } , nil
8888}
8989
90- func (a Aggregator ) aggregateEnterpriseSearchMemory (ctx context.Context ) (resource. Quantity , error ) {
90+ func (a aggregator ) aggregateEnterpriseSearchMemory (ctx context.Context ) (managedMemory , error ) {
9191 var entList entv1.EnterpriseSearchList
9292 err := a .client .List (context .Background (), & entList )
9393 if err != nil {
94- return resource. Quantity {}, errors .Wrap (err , "failed to aggregate Enterprise Search memory" )
94+ return managedMemory {}, errors .Wrap (err , "failed to aggregate Enterprise Search memory" )
9595 }
9696
9797 var total resource.Quantity
@@ -103,22 +103,22 @@ func (a Aggregator) aggregateEnterpriseSearchMemory(ctx context.Context) (resour
103103 enterprisesearch .DefaultMemoryLimits ,
104104 )
105105 if err != nil {
106- return resource. Quantity {}, errors .Wrap (err , "failed to aggregate Enterprise Search memory" )
106+ return managedMemory {}, errors .Wrap (err , "failed to aggregate Enterprise Search memory" )
107107 }
108108
109109 total .Add (multiply (mem , ent .Spec .Count ))
110110 ulog .FromContext (ctx ).V (1 ).Info ("Collecting" , "namespace" , ent .Namespace , "ent_name" , ent .Name ,
111111 "memory" , mem .String (), "count" , ent .Spec .Count )
112112 }
113113
114- return total , nil
114+ return managedMemory { total , entSearchKey } , nil
115115}
116116
117- func (a Aggregator ) aggregateKibanaMemory (ctx context.Context ) (resource. Quantity , error ) {
117+ func (a aggregator ) aggregateKibanaMemory (ctx context.Context ) (managedMemory , error ) {
118118 var kbList kbv1.KibanaList
119119 err := a .client .List (context .Background (), & kbList )
120120 if err != nil {
121- return resource. Quantity {}, errors .Wrap (err , "failed to aggregate Kibana memory" )
121+ return managedMemory {}, errors .Wrap (err , "failed to aggregate Kibana memory" )
122122 }
123123
124124 var total resource.Quantity
@@ -130,22 +130,22 @@ func (a Aggregator) aggregateKibanaMemory(ctx context.Context) (resource.Quantit
130130 kibana .DefaultMemoryLimits ,
131131 )
132132 if err != nil {
133- return resource. Quantity {}, errors .Wrap (err , "failed to aggregate Kibana memory" )
133+ return managedMemory {}, errors .Wrap (err , "failed to aggregate Kibana memory" )
134134 }
135135
136136 total .Add (multiply (mem , kb .Spec .Count ))
137137 ulog .FromContext (ctx ).V (1 ).Info ("Collecting" , "namespace" , kb .Namespace , "kibana_name" , kb .Name ,
138138 "memory" , mem .String (), "count" , kb .Spec .Count )
139139 }
140140
141- return total , nil
141+ return managedMemory { total , kibanaKey } , nil
142142}
143143
144- func (a Aggregator ) aggregateLogstashMemory (ctx context.Context ) (resource. Quantity , error ) {
144+ func (a aggregator ) aggregateLogstashMemory (ctx context.Context ) (managedMemory , error ) {
145145 var lsList lsv1alpha1.LogstashList
146146 err := a .client .List (context .Background (), & lsList )
147147 if err != nil {
148- return resource. Quantity {}, errors .Wrap (err , "failed to aggregate Logstash memory" )
148+ return managedMemory {}, errors .Wrap (err , "failed to aggregate Logstash memory" )
149149 }
150150
151151 var total resource.Quantity
@@ -157,22 +157,22 @@ func (a Aggregator) aggregateLogstashMemory(ctx context.Context) (resource.Quant
157157 logstash .DefaultMemoryLimit ,
158158 )
159159 if err != nil {
160- return resource. Quantity {}, errors .Wrap (err , "failed to aggregate Logstash memory" )
160+ return managedMemory {}, errors .Wrap (err , "failed to aggregate Logstash memory" )
161161 }
162162
163163 total .Add (multiply (mem , ls .Spec .Count ))
164164 ulog .FromContext (ctx ).V (1 ).Info ("Collecting" , "namespace" , ls .Namespace , "logstash_name" , ls .Name ,
165165 "memory" , mem .String (), "count" , ls .Spec .Count )
166166 }
167167
168- return total , nil
168+ return managedMemory { total , logstashKey } , nil
169169}
170170
171- func (a Aggregator ) aggregateApmServerMemory (ctx context.Context ) (resource. Quantity , error ) {
171+ func (a aggregator ) aggregateApmServerMemory (ctx context.Context ) (managedMemory , error ) {
172172 var asList apmv1.ApmServerList
173173 err := a .client .List (context .Background (), & asList )
174174 if err != nil {
175- return resource. Quantity {}, errors .Wrap (err , "failed to aggregate APM Server memory" )
175+ return managedMemory {}, errors .Wrap (err , "failed to aggregate APM Server memory" )
176176 }
177177
178178 var total resource.Quantity
@@ -184,15 +184,15 @@ func (a Aggregator) aggregateApmServerMemory(ctx context.Context) (resource.Quan
184184 apmserver .DefaultMemoryLimits ,
185185 )
186186 if err != nil {
187- return resource. Quantity {}, errors .Wrap (err , "failed to aggregate APM Server memory" )
187+ return managedMemory {}, errors .Wrap (err , "failed to aggregate APM Server memory" )
188188 }
189189
190190 total .Add (multiply (mem , as .Spec .Count ))
191191 ulog .FromContext (ctx ).V (1 ).Info ("Collecting" , "namespace" , as .Namespace , "as_name" , as .Name ,
192192 "memory" , mem .String (), "count" , as .Spec .Count )
193193 }
194194
195- return total , nil
195+ return managedMemory { total , apmKey } , nil
196196}
197197
198198// containerMemLimits reads the container memory limits from the resource specification with fallback
0 commit comments