From 4ea5667ca000e260753b7c78d0c62a648fbc970c Mon Sep 17 00:00:00 2001 From: nyagamunene Date: Wed, 13 May 2026 03:42:50 +0300 Subject: [PATCH 1/7] update heatbeat authentication model Signed-off-by: nyagamunene --- middleware/logging.go | 18 +++++++++++++++ middleware/metrics.go | 9 ++++++++ pkg/conn/conn.go | 41 +++++++++++----------------------- service.go | 52 +++---------------------------------------- 4 files changed, 43 insertions(+), 77 deletions(-) diff --git a/middleware/logging.go b/middleware/logging.go index f5327e58..d3b068cf 100644 --- a/middleware/logging.go +++ b/middleware/logging.go @@ -224,6 +224,24 @@ func (lm *loggingMiddleware) OTA(ctx context.Context, url, sha256hex string, siz return lm.svc.OTA(ctx, url, sha256hex, size) } +func (lm *loggingMiddleware) UpdateLiveness(svcname, svctype string) (err error) { + defer func(begin time.Time) { + args := []any{ + slog.String("duration", time.Since(begin).String()), + slog.String("svcname", svcname), + slog.String("svctype", svctype), + } + if err != nil { + args = append(args, slog.String("error", err.Error())) + lm.logger.Warn("Update liveness failed to complete successfully.", args...) + return + } + lm.logger.Info("Update liveness completed successfully.", args...) + }(time.Now()) + + return lm.svc.UpdateLiveness(svcname, svctype) +} + func (lm *loggingMiddleware) NodeRed(cmdStr string) (resp string, err error) { defer func(begin time.Time) { args := []any{ diff --git a/middleware/metrics.go b/middleware/metrics.go index f1d19417..addf7941 100644 --- a/middleware/metrics.go +++ b/middleware/metrics.go @@ -149,6 +149,15 @@ func (ms *metricsMiddleware) OTA(ctx context.Context, url, sha256hex string, siz return ms.svc.OTA(ctx, url, sha256hex, size) } +func (ms *metricsMiddleware) UpdateLiveness(svcname, svctype string) error { + defer func(begin time.Time) { + ms.counter.With("method", "update_liveness").Add(1) + ms.latency.With("method", "update_liveness").Observe(time.Since(begin).Seconds()) + }(time.Now()) + + return ms.svc.UpdateLiveness(svcname, svctype) +} + func (ms *metricsMiddleware) NodeRed(cmdStr string) (string, error) { defer func(begin time.Time) { ms.counter.With("method", "nodered").Add(1) diff --git a/pkg/conn/conn.go b/pkg/conn/conn.go index fc8613a8..97b9c3dd 100644 --- a/pkg/conn/conn.go +++ b/pkg/conn/conn.go @@ -284,39 +284,24 @@ func (b *broker) handleOTACfgMsg(ctx context.Context, msg mqtt.Message) { }(context.WithoutCancel(ctx)) } -// extractHeartbeat checks whether the MQTT topic is a service heartbeat and, -// if so, returns the service name and type parsed from the topic and SenML payload. -func extractHeartbeat(mqttTopic string, payload []byte) (svcname, svctype string, ok bool) { - isEmpty := func(s string) bool { return len(s) == 0 } - channelParts := channelPartRegExp.FindStringSubmatch(mqttTopic) - if len(channelParts) < 4 || channelParts[3] == "" { - return "", "", false +func extractBrokerTopic(topic string) string { + isEmpty := func(s string) bool { + return (len(s) == 0) } - parts := filter.Drop(strings.Split(channelParts[3], "/"), isEmpty).([]string) - if len(parts) < 2 || parts[len(parts)-1] != "heartbeat" { - return "", "", false + channelParts := channelPartRegExp.FindStringSubmatch(topic) + if len(channelParts) < 4 { + return "" } - return parts[len(parts)-2], parseSvcType(payload), true -} + // channelParts[3] is the subtopic after /services + filtered := filter.Drop(strings.Split(channelParts[3], "/"), isEmpty).([]string) + brokerTopic := strings.Join(filtered, ".") -// parseSvcType extracts the service_type field from a SenML heartbeat payload, -// defaulting to "service" if the payload cannot be parsed. -func parseSvcType(payload []byte) string { - records, err := senml.Decode(payload) - if err != nil { - return "service" - } - for _, r := range records { - if r.Name == "service_type" && r.StringValue != nil { - return *r.StringValue - } - } - return "service" + return fmt.Sprintf("%s.%s", commands, brokerTopic) } -// handleMsg dispatches an inbound MQTT command to the registered handler. -func (b *broker) handleMsg(msg mqtt.Message) { - records, err := senml.Decode(msg.Payload()) +// handleMsg triggered when new message is received on MQTT broker. +func (b *broker) handleMsg(mc mqtt.Client, msg mqtt.Message) { + sm, err := senml.Decode(msg.Payload(), senml.JSON) if err != nil { b.logger.Warn("SenML decode failed", slog.Any("error", err)) return diff --git a/service.go b/service.go index 86a711dc..41c4476d 100644 --- a/service.go +++ b/service.go @@ -885,55 +885,9 @@ func (a *agent) publish(t, payload string, qos byte) error { return nil } -func (a *agent) selfHeartbeat(ctx context.Context, topic string, interval time.Duration, qos byte) { - publish := func() { - payload, err := a.selfHeartbeatPayload() - if err != nil { - a.logger.Error("failed to encode self-heartbeat", slog.Any("error", err)) - return - } - token := a.mqttClient.Publish(topic, qos, false, payload) - token.Wait() - if err := token.Error(); err != nil { - a.logger.Warn("self-heartbeat publish failed", slog.Any("error", err)) - } - } - publish() - ticker := time.NewTicker(interval) - defer ticker.Stop() - for { - select { - case <-ticker.C: - publish() - case d := <-a.heartbeatIntervalCh: - ticker.Reset(d) - case <-ctx.Done(): - return - } - } -} - -func (a *agent) selfHeartbeatPayload() ([]byte, error) { - metrics.Read(heapSamples) - - heapFree := uint64(0) - if len(heapSamples) > 0 { - heapFree = heapSamples[0].Value.Uint64() - } - - deviceCount := 0 - if a.devices != nil { - n, err := a.devices.Count() - if err != nil { - a.logger.Warn("failed to count devices for self-heartbeat", slog.Any("error", err)) - } else { - deviceCount = n - } - } - - svcType := "agent" - heartbeat := true - fwVersion := Version +func (a *agent) Ping(uuid string) error { + now := float64(time.Now().Unix()) + vb := true uptime := time.Since(startTime).Seconds() heapFreeValue := float64(heapFree) deviceCountValue := float64(deviceCount) From ad13d775d393b26fdcefb214ff613980254d88e2 Mon Sep 17 00:00:00 2001 From: nyagamunene Date: Wed, 13 May 2026 12:07:41 +0300 Subject: [PATCH 2/7] remove fluxmq containers Signed-off-by: nyagamunene --- mocks/pub_sub.go | 269 ----------------------------------------------- pkg/conn/conn.go | 32 +----- service.go | 1 + 3 files changed, 6 insertions(+), 296 deletions(-) delete mode 100644 mocks/pub_sub.go diff --git a/mocks/pub_sub.go b/mocks/pub_sub.go deleted file mode 100644 index a18065f3..00000000 --- a/mocks/pub_sub.go +++ /dev/null @@ -1,269 +0,0 @@ -// Copyright (c) Abstract Machines -// SPDX-License-Identifier: Apache-2.0 - -// Code generated by mockery; DO NOT EDIT. -// github.com/vektra/mockery -// template: testify - -package mocks - -import ( - "context" - - "github.com/absmach/magistrala/pkg/messaging" - mock "github.com/stretchr/testify/mock" -) - -// NewPubSub creates a new instance of PubSub. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -// The first argument is typically a *testing.T value. -func NewPubSub(t interface { - mock.TestingT - Cleanup(func()) -}) *PubSub { - mock := &PubSub{} - mock.Mock.Test(t) - - t.Cleanup(func() { mock.AssertExpectations(t) }) - - return mock -} - -// PubSub is an autogenerated mock type for the PubSub type -type PubSub struct { - mock.Mock -} - -type PubSub_Expecter struct { - mock *mock.Mock -} - -func (_m *PubSub) EXPECT() *PubSub_Expecter { - return &PubSub_Expecter{mock: &_m.Mock} -} - -// Close provides a mock function for the type PubSub -func (_mock *PubSub) Close() error { - ret := _mock.Called() - - if len(ret) == 0 { - panic("no return value specified for Close") - } - - var r0 error - if returnFunc, ok := ret.Get(0).(func() error); ok { - r0 = returnFunc() - } else { - r0 = ret.Error(0) - } - return r0 -} - -// PubSub_Close_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Close' -type PubSub_Close_Call struct { - *mock.Call -} - -// Close is a helper method to define mock.On call -func (_e *PubSub_Expecter) Close() *PubSub_Close_Call { - return &PubSub_Close_Call{Call: _e.mock.On("Close")} -} - -func (_c *PubSub_Close_Call) Run(run func()) *PubSub_Close_Call { - _c.Call.Run(func(args mock.Arguments) { - run() - }) - return _c -} - -func (_c *PubSub_Close_Call) Return(err error) *PubSub_Close_Call { - _c.Call.Return(err) - return _c -} - -func (_c *PubSub_Close_Call) RunAndReturn(run func() error) *PubSub_Close_Call { - _c.Call.Return(run) - return _c -} - -// Publish provides a mock function for the type PubSub -func (_mock *PubSub) Publish(ctx context.Context, topic string, msg *messaging.Message) error { - ret := _mock.Called(ctx, topic, msg) - - if len(ret) == 0 { - panic("no return value specified for Publish") - } - - var r0 error - if returnFunc, ok := ret.Get(0).(func(context.Context, string, *messaging.Message) error); ok { - r0 = returnFunc(ctx, topic, msg) - } else { - r0 = ret.Error(0) - } - return r0 -} - -// PubSub_Publish_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Publish' -type PubSub_Publish_Call struct { - *mock.Call -} - -// Publish is a helper method to define mock.On call -// - ctx context.Context -// - topic string -// - msg *messaging.Message -func (_e *PubSub_Expecter) Publish(ctx interface{}, topic interface{}, msg interface{}) *PubSub_Publish_Call { - return &PubSub_Publish_Call{Call: _e.mock.On("Publish", ctx, topic, msg)} -} - -func (_c *PubSub_Publish_Call) Run(run func(ctx context.Context, topic string, msg *messaging.Message)) *PubSub_Publish_Call { - _c.Call.Run(func(args mock.Arguments) { - var arg0 context.Context - if args[0] != nil { - arg0 = args[0].(context.Context) - } - var arg1 string - if args[1] != nil { - arg1 = args[1].(string) - } - var arg2 *messaging.Message - if args[2] != nil { - arg2 = args[2].(*messaging.Message) - } - run( - arg0, - arg1, - arg2, - ) - }) - return _c -} - -func (_c *PubSub_Publish_Call) Return(err error) *PubSub_Publish_Call { - _c.Call.Return(err) - return _c -} - -func (_c *PubSub_Publish_Call) RunAndReturn(run func(ctx context.Context, topic string, msg *messaging.Message) error) *PubSub_Publish_Call { - _c.Call.Return(run) - return _c -} - -// Subscribe provides a mock function for the type PubSub -func (_mock *PubSub) Subscribe(ctx context.Context, cfg messaging.SubscriberConfig) error { - ret := _mock.Called(ctx, cfg) - - if len(ret) == 0 { - panic("no return value specified for Subscribe") - } - - var r0 error - if returnFunc, ok := ret.Get(0).(func(context.Context, messaging.SubscriberConfig) error); ok { - r0 = returnFunc(ctx, cfg) - } else { - r0 = ret.Error(0) - } - return r0 -} - -// PubSub_Subscribe_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Subscribe' -type PubSub_Subscribe_Call struct { - *mock.Call -} - -// Subscribe is a helper method to define mock.On call -// - ctx context.Context -// - cfg messaging.SubscriberConfig -func (_e *PubSub_Expecter) Subscribe(ctx interface{}, cfg interface{}) *PubSub_Subscribe_Call { - return &PubSub_Subscribe_Call{Call: _e.mock.On("Subscribe", ctx, cfg)} -} - -func (_c *PubSub_Subscribe_Call) Run(run func(ctx context.Context, cfg messaging.SubscriberConfig)) *PubSub_Subscribe_Call { - _c.Call.Run(func(args mock.Arguments) { - var arg0 context.Context - if args[0] != nil { - arg0 = args[0].(context.Context) - } - var arg1 messaging.SubscriberConfig - if args[1] != nil { - arg1 = args[1].(messaging.SubscriberConfig) - } - run( - arg0, - arg1, - ) - }) - return _c -} - -func (_c *PubSub_Subscribe_Call) Return(err error) *PubSub_Subscribe_Call { - _c.Call.Return(err) - return _c -} - -func (_c *PubSub_Subscribe_Call) RunAndReturn(run func(ctx context.Context, cfg messaging.SubscriberConfig) error) *PubSub_Subscribe_Call { - _c.Call.Return(run) - return _c -} - -// Unsubscribe provides a mock function for the type PubSub -func (_mock *PubSub) Unsubscribe(ctx context.Context, id string, topic string) error { - ret := _mock.Called(ctx, id, topic) - - if len(ret) == 0 { - panic("no return value specified for Unsubscribe") - } - - var r0 error - if returnFunc, ok := ret.Get(0).(func(context.Context, string, string) error); ok { - r0 = returnFunc(ctx, id, topic) - } else { - r0 = ret.Error(0) - } - return r0 -} - -// PubSub_Unsubscribe_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Unsubscribe' -type PubSub_Unsubscribe_Call struct { - *mock.Call -} - -// Unsubscribe is a helper method to define mock.On call -// - ctx context.Context -// - id string -// - topic string -func (_e *PubSub_Expecter) Unsubscribe(ctx interface{}, id interface{}, topic interface{}) *PubSub_Unsubscribe_Call { - return &PubSub_Unsubscribe_Call{Call: _e.mock.On("Unsubscribe", ctx, id, topic)} -} - -func (_c *PubSub_Unsubscribe_Call) Run(run func(ctx context.Context, id string, topic string)) *PubSub_Unsubscribe_Call { - _c.Call.Run(func(args mock.Arguments) { - var arg0 context.Context - if args[0] != nil { - arg0 = args[0].(context.Context) - } - var arg1 string - if args[1] != nil { - arg1 = args[1].(string) - } - var arg2 string - if args[2] != nil { - arg2 = args[2].(string) - } - run( - arg0, - arg1, - arg2, - ) - }) - return _c -} - -func (_c *PubSub_Unsubscribe_Call) Return(err error) *PubSub_Unsubscribe_Call { - _c.Call.Return(err) - return _c -} - -func (_c *PubSub_Unsubscribe_Call) RunAndReturn(run func(ctx context.Context, id string, topic string) error) *PubSub_Unsubscribe_Call { - _c.Call.Return(run) - return _c -} diff --git a/pkg/conn/conn.go b/pkg/conn/conn.go index 97b9c3dd..197155a6 100644 --- a/pkg/conn/conn.go +++ b/pkg/conn/conn.go @@ -8,16 +8,12 @@ import ( "crypto/subtle" "fmt" "log/slog" - "os" "regexp" "strings" - "sync" - "syscall" "github.com/absmach/agent" - "github.com/absmach/agent/pkg/encoder" - "github.com/absmach/agent/pkg/ota" "github.com/absmach/agent/pkg/senml" + "github.com/absmach/magistrala/pkg/messaging" mqtt "github.com/eclipse/paho.mqtt.golang" "robpike.io/filter" ) @@ -272,33 +268,15 @@ func (b *broker) handleOTACfgMsg(ctx context.Context, msg mqtt.Message) { trigger, err := ota.TriggerFromRecords(records) if err != nil { - b.logger.Warn("OTA cfg trigger parse failed", slog.Any("error", err)) - return + return "service" } - - b.logger.Info("OTA cfg command", slog.String("url", trigger.URL)) - go func(ctx context.Context) { - if err := b.svc.OTA(ctx, trigger.URL, trigger.SHA256Hex, trigger.Size); err != nil { - b.logger.Warn("OTA cfg operation failed", slog.Any("error", err)) + for _, r := range sm.Records { + if r.Name == "service_type" && r.StringValue != nil { + return *r.StringValue } }(context.WithoutCancel(ctx)) } -func extractBrokerTopic(topic string) string { - isEmpty := func(s string) bool { - return (len(s) == 0) - } - channelParts := channelPartRegExp.FindStringSubmatch(topic) - if len(channelParts) < 4 { - return "" - } - // channelParts[3] is the subtopic after /services - filtered := filter.Drop(strings.Split(channelParts[3], "/"), isEmpty).([]string) - brokerTopic := strings.Join(filtered, ".") - - return fmt.Sprintf("%s.%s", commands, brokerTopic) -} - // handleMsg triggered when new message is received on MQTT broker. func (b *broker) handleMsg(mc mqtt.Client, msg mqtt.Message) { sm, err := senml.Decode(msg.Payload(), senml.JSON) diff --git a/service.go b/service.go index 41c4476d..b22a181d 100644 --- a/service.go +++ b/service.go @@ -30,6 +30,7 @@ import ( "github.com/absmach/agent/pkg/senml" "github.com/absmach/agent/pkg/terminal" "github.com/absmach/magistrala/pkg/errors" + senml "github.com/absmach/senml" paho "github.com/eclipse/paho.mqtt.golang" toml "github.com/pelletier/go-toml" ) From 1a034672df481f061a3ed8194cb220b74f9529c9 Mon Sep 17 00:00:00 2001 From: nyagamunene Date: Thu, 14 May 2026 14:22:45 +0300 Subject: [PATCH 3/7] add health metrics Signed-off-by: nyagamunene --- api/endpoints.go | 6 ++++++ api/responses.go | 21 +++++++++++++++++++++ api/transport.go | 7 +++++++ middleware/logging.go | 9 +++++++++ middleware/metrics.go | 10 ++++++++++ mocks/service.go | 37 +++++++++++++++++++++++++++++++++++++ service.go | 20 +++++++++++++++++--- version.go | 9 +++++++++ 8 files changed, 116 insertions(+), 3 deletions(-) create mode 100644 version.go diff --git a/api/endpoints.go b/api/endpoints.go index 33f26a9d..58dc5337 100644 --- a/api/endpoints.go +++ b/api/endpoints.go @@ -19,6 +19,12 @@ import ( const svcName = "agent" +func healthEndpoint(svc agent.Service) endpoint.Endpoint { + return func(_ context.Context, _ any) (any, error) { + return healthRes{Metrics: svc.Health()}, nil + } +} + func pubEndpoint(svc agent.Service) endpoint.Endpoint { return func(_ context.Context, request any) (any, error) { req := request.(pubReq) diff --git a/api/responses.go b/api/responses.go index f49ef39b..668a9f15 100644 --- a/api/responses.go +++ b/api/responses.go @@ -8,12 +8,14 @@ import ( "github.com/absmach/agent" "github.com/absmach/agent/pkg/devicemgr" + "github.com/absmach/agent/pkg/health" "github.com/absmach/magistrala" ) var ( _ magistrala.Response = (*publishRes)(nil) _ magistrala.Response = (*execRes)(nil) + _ magistrala.Response = (*healthRes)(nil) _ magistrala.Response = (*addConfigRes)(nil) _ magistrala.Response = (*viewConfigRes)(nil) _ magistrala.Response = (*viewServicesRes)(nil) @@ -244,3 +246,22 @@ func (res otaStatusRes) Headers() map[string]string { func (res otaStatusRes) Empty() bool { return false } + +type healthRes struct { + *health.Metrics +} + +func (res healthRes) Code() int { + if res.Metrics == nil { + return http.StatusServiceUnavailable + } + return http.StatusOK +} + +func (res healthRes) Headers() map[string]string { + return map[string]string{} +} + +func (res healthRes) Empty() bool { + return res.Metrics == nil +} diff --git a/api/transport.go b/api/transport.go index 921428c6..69025acc 100644 --- a/api/transport.go +++ b/api/transport.go @@ -145,6 +145,13 @@ func MakeHandler(svc agent.Service, logger *slog.Logger, stream *logstream.Strea }) r.Handle("/ui/*", http.StripPrefix("/ui", agentui.Handler())) + r.Get("/api/health", kithttp.NewServer( + healthEndpoint(svc), + decodeRequest, + EncodeResponse, + opts..., + ).ServeHTTP) + return r } diff --git a/middleware/logging.go b/middleware/logging.go index d3b068cf..74186ab1 100644 --- a/middleware/logging.go +++ b/middleware/logging.go @@ -11,6 +11,7 @@ import ( "github.com/absmach/agent" "github.com/absmach/agent/pkg/devicemgr" + "github.com/absmach/agent/pkg/health" "github.com/go-chi/chi/v5/middleware" ) @@ -242,6 +243,14 @@ func (lm *loggingMiddleware) UpdateLiveness(svcname, svctype string) (err error) return lm.svc.UpdateLiveness(svcname, svctype) } +func (lm *loggingMiddleware) Health() *health.Metrics { + defer func(begin time.Time) { + lm.logger.Info("Retrieve health completed successfully.", slog.String("duration", time.Since(begin).String())) + }(time.Now()) + + return lm.svc.Health() +} + func (lm *loggingMiddleware) NodeRed(cmdStr string) (resp string, err error) { defer func(begin time.Time) { args := []any{ diff --git a/middleware/metrics.go b/middleware/metrics.go index addf7941..d0595369 100644 --- a/middleware/metrics.go +++ b/middleware/metrics.go @@ -12,6 +12,7 @@ import ( "github.com/absmach/agent" "github.com/absmach/agent/pkg/devicemgr" + "github.com/absmach/agent/pkg/health" "github.com/go-kit/kit/metrics" ) @@ -158,6 +159,15 @@ func (ms *metricsMiddleware) UpdateLiveness(svcname, svctype string) error { return ms.svc.UpdateLiveness(svcname, svctype) } +func (ms *metricsMiddleware) Health() *health.Metrics { + defer func(begin time.Time) { + ms.counter.With("method", "health").Add(1) + ms.latency.With("method", "health").Observe(time.Since(begin).Seconds()) + }(time.Now()) + + return ms.svc.Health() +} + func (ms *metricsMiddleware) NodeRed(cmdStr string) (string, error) { defer func(begin time.Time) { ms.counter.With("method", "nodered").Add(1) diff --git a/mocks/service.go b/mocks/service.go index f9db0b33..b557907a 100644 --- a/mocks/service.go +++ b/mocks/service.go @@ -12,6 +12,7 @@ import ( "github.com/absmach/agent" "github.com/absmach/agent/pkg/devicemgr" + "github.com/absmach/agent/pkg/health" mock "github.com/stretchr/testify/mock" ) @@ -1304,3 +1305,39 @@ func (_c *Service_UpdateLiveness_Call) RunAndReturn(run func(svcname string, svc _c.Call.Return(run) return _c } + +func (_mock *Service) Health() *health.Metrics { + ret := _mock.Called() + var r0 *health.Metrics + if rf, ok := ret.Get(0).(func() *health.Metrics); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*health.Metrics) + } + } + return r0 +} + +type Service_Health_Call struct { + *mock.Call +} + +func (_e *Service_Expecter) Health() *Service_Health_Call { + return &Service_Health_Call{Call: _e.mock.On("Health")} +} + +func (_c *Service_Health_Call) Run(run func()) *Service_Health_Call { + _c.Call.Run(func(args mock.Arguments) { run() }) + return _c +} + +func (_c *Service_Health_Call) Return(m *health.Metrics) *Service_Health_Call { + _c.Call.Return(m) + return _c +} + +func (_c *Service_Health_Call) RunAndReturn(run func() *health.Metrics) *Service_Health_Call { + _c.Call.Return(run) + return _c +} diff --git a/service.go b/service.go index b22a181d..b361032d 100644 --- a/service.go +++ b/service.go @@ -19,12 +19,14 @@ import ( "strings" "sync" "sync/atomic" + "sync/atomic" "time" cfgstore "github.com/absmach/agent/pkg/config" "github.com/absmach/agent/pkg/devicemgr" "github.com/absmach/agent/pkg/encoder" "github.com/absmach/agent/pkg/iface" + "github.com/absmach/agent/pkg/health" "github.com/absmach/agent/pkg/nodered" "github.com/absmach/agent/pkg/ota" "github.com/absmach/agent/pkg/senml" @@ -232,6 +234,9 @@ type Service interface { type OTAStatusInfo struct { Busy bool `json:"busy"` LastError string `json:"last_error,omitempty"` + + // Health returns the latest gateway health metrics, or nil if not yet collected. + Health() *health.Metrics } var _ Service = (*agent)(nil) @@ -261,6 +266,7 @@ type agent struct { otaCancel context.CancelFunc otaAborted atomic.Bool bootstrapCachePath string + latestHealth atomic.Pointer[health.Metrics] } // New returns agent service implementation. @@ -290,9 +296,13 @@ func New(ctx context.Context, mc paho.Client, cfg *Config, nc nodered.Client, lo ag.sched = sched } - topic := fmt.Sprintf("m/%s/c/%s/gateway/heartbeat", + selfTopic := fmt.Sprintf("m/%s/c/%s/gateway/heartbeat", + cfg.DomainID, cfg.Channels.DataChan()) + go ag.selfHeartbeat(ctx, selfTopic, cfg.Heartbeat.Interval, cfg.MQTT.QoS) + + healthTopic := fmt.Sprintf("m/%s/c/%s/gateway/heartbeat", cfg.DomainID, cfg.Channels.DataChan()) - go ag.selfHeartbeat(ctx, topic, cfg.Heartbeat.Interval, cfg.MQTT.QoS) + go ag.healthHeartbeat(ctx, healthTopic, cfg.Heartbeat.Interval, cfg.MQTT.QoS) if cfg.Telemetry.Interval > 0 { telemetryTopic := fmt.Sprintf("m/%s/c/%s/gateway/telemetry", @@ -920,7 +930,7 @@ func (a *agent) selfTelemetry(ctx context.Context, topic string, interval time.D token := a.mqttClient.Publish(topic, qos, false, b) token.Wait() if err := token.Error(); err != nil { - a.logger.Warn("self-telemetry publish failed", slog.Any("error", err)) + a.logger.Warn("self-heartbeat publish failed", slog.Any("error", err)) } } @@ -1022,6 +1032,10 @@ func (a *agent) gatewayTelemetryPayload() []senml.Record { return records } +func (a *agent) Health() *health.Metrics { + return a.latestHealth.Load() +} + func (a *agent) UpdateLiveness(svcname, svctype string) error { a.svcsMu.Lock() defer a.svcsMu.Unlock() diff --git a/version.go b/version.go new file mode 100644 index 00000000..a0272d3a --- /dev/null +++ b/version.go @@ -0,0 +1,9 @@ +// Copyright (c) Abstract Machines +// SPDX-License-Identifier: Apache-2.0 + +package agent + +// Version is the agent binary version. It is injected at build time from the +// latest Git tag by the Makefile (make all). Defaults to "dev" for local builds +// that bypass the Makefile. +var Version = "dev" From f1bfe739839e9b44f1b7cc67a0b68f46c309126e Mon Sep 17 00:00:00 2001 From: nyagamunene Date: Thu, 11 Jun 2026 17:00:22 +0300 Subject: [PATCH 4/7] fix issues Signed-off-by: nyagamunene --- middleware/logging.go | 18 ----- middleware/metrics.go | 9 --- pkg/conn/conn.go | 49 ++++++++++-- pkg/health/health.go | 181 ++++++++++++++++++++++++++++++++++++++++++ service.go | 120 ++++++++++++++++++++++++---- 5 files changed, 328 insertions(+), 49 deletions(-) diff --git a/middleware/logging.go b/middleware/logging.go index 74186ab1..8305c426 100644 --- a/middleware/logging.go +++ b/middleware/logging.go @@ -225,24 +225,6 @@ func (lm *loggingMiddleware) OTA(ctx context.Context, url, sha256hex string, siz return lm.svc.OTA(ctx, url, sha256hex, size) } -func (lm *loggingMiddleware) UpdateLiveness(svcname, svctype string) (err error) { - defer func(begin time.Time) { - args := []any{ - slog.String("duration", time.Since(begin).String()), - slog.String("svcname", svcname), - slog.String("svctype", svctype), - } - if err != nil { - args = append(args, slog.String("error", err.Error())) - lm.logger.Warn("Update liveness failed to complete successfully.", args...) - return - } - lm.logger.Info("Update liveness completed successfully.", args...) - }(time.Now()) - - return lm.svc.UpdateLiveness(svcname, svctype) -} - func (lm *loggingMiddleware) Health() *health.Metrics { defer func(begin time.Time) { lm.logger.Info("Retrieve health completed successfully.", slog.String("duration", time.Since(begin).String())) diff --git a/middleware/metrics.go b/middleware/metrics.go index d0595369..153004bf 100644 --- a/middleware/metrics.go +++ b/middleware/metrics.go @@ -150,15 +150,6 @@ func (ms *metricsMiddleware) OTA(ctx context.Context, url, sha256hex string, siz return ms.svc.OTA(ctx, url, sha256hex, size) } -func (ms *metricsMiddleware) UpdateLiveness(svcname, svctype string) error { - defer func(begin time.Time) { - ms.counter.With("method", "update_liveness").Add(1) - ms.latency.With("method", "update_liveness").Observe(time.Since(begin).Seconds()) - }(time.Now()) - - return ms.svc.UpdateLiveness(svcname, svctype) -} - func (ms *metricsMiddleware) Health() *health.Metrics { defer func(begin time.Time) { ms.counter.With("method", "health").Add(1) diff --git a/pkg/conn/conn.go b/pkg/conn/conn.go index 197155a6..fc8613a8 100644 --- a/pkg/conn/conn.go +++ b/pkg/conn/conn.go @@ -8,12 +8,16 @@ import ( "crypto/subtle" "fmt" "log/slog" + "os" "regexp" "strings" + "sync" + "syscall" "github.com/absmach/agent" + "github.com/absmach/agent/pkg/encoder" + "github.com/absmach/agent/pkg/ota" "github.com/absmach/agent/pkg/senml" - "github.com/absmach/magistrala/pkg/messaging" mqtt "github.com/eclipse/paho.mqtt.golang" "robpike.io/filter" ) @@ -267,19 +271,52 @@ func (b *broker) handleOTACfgMsg(ctx context.Context, msg mqtt.Message) { } trigger, err := ota.TriggerFromRecords(records) + if err != nil { + b.logger.Warn("OTA cfg trigger parse failed", slog.Any("error", err)) + return + } + + b.logger.Info("OTA cfg command", slog.String("url", trigger.URL)) + go func(ctx context.Context) { + if err := b.svc.OTA(ctx, trigger.URL, trigger.SHA256Hex, trigger.Size); err != nil { + b.logger.Warn("OTA cfg operation failed", slog.Any("error", err)) + } + }(context.WithoutCancel(ctx)) +} + +// extractHeartbeat checks whether the MQTT topic is a service heartbeat and, +// if so, returns the service name and type parsed from the topic and SenML payload. +func extractHeartbeat(mqttTopic string, payload []byte) (svcname, svctype string, ok bool) { + isEmpty := func(s string) bool { return len(s) == 0 } + channelParts := channelPartRegExp.FindStringSubmatch(mqttTopic) + if len(channelParts) < 4 || channelParts[3] == "" { + return "", "", false + } + parts := filter.Drop(strings.Split(channelParts[3], "/"), isEmpty).([]string) + if len(parts) < 2 || parts[len(parts)-1] != "heartbeat" { + return "", "", false + } + return parts[len(parts)-2], parseSvcType(payload), true +} + +// parseSvcType extracts the service_type field from a SenML heartbeat payload, +// defaulting to "service" if the payload cannot be parsed. +func parseSvcType(payload []byte) string { + records, err := senml.Decode(payload) if err != nil { return "service" } - for _, r := range sm.Records { + for _, r := range records { if r.Name == "service_type" && r.StringValue != nil { return *r.StringValue } - }(context.WithoutCancel(ctx)) + } + return "service" } -// handleMsg triggered when new message is received on MQTT broker. -func (b *broker) handleMsg(mc mqtt.Client, msg mqtt.Message) { - sm, err := senml.Decode(msg.Payload(), senml.JSON) +// handleMsg dispatches an inbound MQTT command to the registered handler. +func (b *broker) handleMsg(msg mqtt.Message) { + records, err := senml.Decode(msg.Payload()) if err != nil { b.logger.Warn("SenML decode failed", slog.Any("error", err)) return diff --git a/pkg/health/health.go b/pkg/health/health.go index d2c4d7a0..be83cb33 100644 --- a/pkg/health/health.go +++ b/pkg/health/health.go @@ -4,10 +4,14 @@ package health import ( + "bufio" "context" "log/slog" "net" "os" + "runtime" + "strconv" + "strings" "sync/atomic" "syscall" "time" @@ -166,6 +170,183 @@ func (s *Supervisor) IsHealthy() bool { return s.healthy.Load() } +// Metrics holds a point-in-time snapshot of gateway health. +type Metrics struct { + Timestamp time.Time `json:"timestamp"` + Uptime float64 `json:"uptime_seconds"` + CPUUsage float64 `json:"cpu_usage_percent"` + MemAvailable uint64 `json:"mem_available_bytes"` + DiskFree uint64 `json:"disk_free_bytes"` + NetRxBytes uint64 `json:"net_rx_bytes"` + NetTxBytes uint64 `json:"net_tx_bytes"` + Goroutines int `json:"goroutines"` + Version string `json:"version"` +} + +type cpuSample struct { + total uint64 + idle uint64 +} + +type netSample struct { + rx uint64 + tx uint64 +} + +// Collector samples system metrics and computes deltas between calls. +type Collector struct { + startTime time.Time + prevCPU cpuSample + prevNet netSample + prevTime time.Time + version string +} + +// NewCollector initialises a Collector, priming the delta baseline. +func NewCollector(version string) *Collector { + c := &Collector{ + startTime: time.Now(), + prevTime: time.Now(), + version: version, + } + c.prevCPU, _ = readCPU() + c.prevNet, _ = readNet() + return c +} + +// Collect reads current system stats, computes deltas from the previous call, +// and returns a Metrics snapshot. It is safe to call concurrently. +func (c *Collector) Collect() Metrics { + now := time.Now() + + cpu, _ := readCPU() + net, _ := readNet() + mem, _ := readMem() + disk, _ := readDisk() + + var cpuPct float64 + if dTotal := cpu.total - c.prevCPU.total; dTotal > 0 { + dIdle := cpu.idle - c.prevCPU.idle + cpuPct = (1 - float64(dIdle)/float64(dTotal)) * 100 + } + + m := Metrics{ + Timestamp: now, + Uptime: now.Sub(c.startTime).Seconds(), + CPUUsage: cpuPct, + MemAvailable: mem, + DiskFree: disk, + NetRxBytes: net.rx - c.prevNet.rx, + NetTxBytes: net.tx - c.prevNet.tx, + Goroutines: runtime.NumGoroutine(), + Version: c.version, + } + + c.prevCPU = cpu + c.prevNet = net + c.prevTime = now + + return m +} + +// readCPU reads the aggregate CPU counters from /proc/stat. +func readCPU() (cpuSample, error) { + f, err := os.Open("/proc/stat") + if err != nil { + return cpuSample{}, err + } + defer f.Close() + + scanner := bufio.NewScanner(f) + for scanner.Scan() { + line := scanner.Text() + if !strings.HasPrefix(line, "cpu ") { + continue + } + fields := strings.Fields(line)[1:] // skip "cpu" label + var vals [10]uint64 + for i := 0; i < len(fields) && i < 10; i++ { + vals[i], _ = strconv.ParseUint(fields[i], 10, 64) + } + // user nice system idle iowait irq softirq steal guest guest_nice + idle := vals[3] + vals[4] + total := vals[0] + vals[1] + vals[2] + vals[3] + vals[4] + vals[5] + vals[6] + vals[7] + return cpuSample{total: total, idle: idle}, nil + } + return cpuSample{}, scanner.Err() +} + +// readMem returns MemAvailable in bytes from /proc/meminfo. +func readMem() (uint64, error) { + f, err := os.Open("/proc/meminfo") + if err != nil { + return 0, err + } + defer f.Close() + + scanner := bufio.NewScanner(f) + for scanner.Scan() { + line := scanner.Text() + if !strings.HasPrefix(line, "MemAvailable:") { + continue + } + fields := strings.Fields(line) + if len(fields) < 2 { + break + } + kb, err := strconv.ParseUint(fields[1], 10, 64) + if err != nil { + return 0, err + } + return kb * 1024, nil + } + return 0, scanner.Err() +} + +// readDisk returns free bytes on the root filesystem via Statfs. +func readDisk() (uint64, error) { + var fs syscall.Statfs_t + if err := syscall.Statfs("/", &fs); err != nil { + return 0, err + } + return fs.Bavail * uint64(fs.Bsize), nil +} + +// readNet sums rx/tx bytes across all non-loopback interfaces from /proc/net/dev. +func readNet() (netSample, error) { + f, err := os.Open("/proc/net/dev") + if err != nil { + return netSample{}, err + } + defer f.Close() + + var sample netSample + scanner := bufio.NewScanner(f) + // Skip two header lines. + scanner.Scan() + scanner.Scan() + for scanner.Scan() { + line := scanner.Text() + colonIdx := strings.Index(line, ":") + if colonIdx < 0 { + continue + } + iface := strings.TrimSpace(line[:colonIdx]) + if iface == "lo" { + continue + } + fields := strings.Fields(line[colonIdx+1:]) + if len(fields) < 9 { + continue + } + rx, _ := strconv.ParseUint(fields[0], 10, 64) + tx, _ := strconv.ParseUint(fields[8], 10, 64) + sample.rx += rx + sample.tx += tx + } + return sample, scanner.Err() +} + // MQTTChecker checks if the MQTT client is connected. type MQTTChecker struct { client MQTTClient diff --git a/service.go b/service.go index b361032d..0c6bb104 100644 --- a/service.go +++ b/service.go @@ -19,20 +19,18 @@ import ( "strings" "sync" "sync/atomic" - "sync/atomic" "time" cfgstore "github.com/absmach/agent/pkg/config" "github.com/absmach/agent/pkg/devicemgr" "github.com/absmach/agent/pkg/encoder" - "github.com/absmach/agent/pkg/iface" "github.com/absmach/agent/pkg/health" + "github.com/absmach/agent/pkg/iface" "github.com/absmach/agent/pkg/nodered" "github.com/absmach/agent/pkg/ota" "github.com/absmach/agent/pkg/senml" "github.com/absmach/agent/pkg/terminal" "github.com/absmach/magistrala/pkg/errors" - senml "github.com/absmach/senml" paho "github.com/eclipse/paho.mqtt.golang" toml "github.com/pelletier/go-toml" ) @@ -83,12 +81,6 @@ const ( var ( startTime = time.Now() - // Version is the agent binary version, injected at build time via - // -ldflags "-X github.com/absmach/agent.Version=x.y.z". - Version = "0.0.0" - Commit = "unknown" - BuildTime = "unknown" - heapSamples = []metrics.Sample{{Name: "/memory/classes/heap/free:bytes"}} ) @@ -227,6 +219,9 @@ type Service interface { // OTAAbort cancels an in-progress OTA update. It returns an error if no OTA is running. OTAAbort() error + // Health returns the latest gateway health metrics, or nil if not yet collected. + Health() *health.Metrics + DeviceService } @@ -234,9 +229,6 @@ type Service interface { type OTAStatusInfo struct { Busy bool `json:"busy"` LastError string `json:"last_error,omitempty"` - - // Health returns the latest gateway health metrics, or nil if not yet collected. - Health() *health.Metrics } var _ Service = (*agent)(nil) @@ -896,9 +888,105 @@ func (a *agent) publish(t, payload string, qos byte) error { return nil } -func (a *agent) Ping(uuid string) error { - now := float64(time.Now().Unix()) - vb := true +func (a *agent) selfHeartbeat(ctx context.Context, topic string, interval time.Duration, qos byte) { + publish := func() { + payload, err := a.selfHeartbeatPayload() + if err != nil { + a.logger.Error("failed to encode self-heartbeat", slog.Any("error", err)) + return + } + token := a.mqttClient.Publish(topic, qos, false, payload) + token.Wait() + if err := token.Error(); err != nil { + a.logger.Warn("self-heartbeat publish failed", slog.Any("error", err)) + } + } + publish() + ticker := time.NewTicker(interval) + defer ticker.Stop() + for { + select { + case <-ticker.C: + publish() + case d := <-a.heartbeatIntervalCh: + ticker.Reset(d) + case <-ctx.Done(): + return + } + } +} + +func (a *agent) healthHeartbeat(ctx context.Context, topic string, interval time.Duration, qos byte) { + collector := health.NewCollector(Version) + publish := func() { + m := collector.Collect() + a.latestHealth.Store(&m) + b, err := senml.EncodeRecords(healthToSenML(m)) + if err != nil { + a.logger.Warn("failed to encode health metrics", slog.Any("error", err)) + return + } + token := a.mqttClient.Publish(topic, qos, false, b) + token.Wait() + if err := token.Error(); err != nil { + a.logger.Warn("health heartbeat publish failed", slog.Any("error", err)) + } + } + publish() + ticker := time.NewTicker(interval) + defer ticker.Stop() + for { + select { + case <-ticker.C: + publish() + case <-ctx.Done(): + return + } + } +} + +func healthToSenML(m health.Metrics) []senml.Record { + base := "gw:" + t := float64(m.Timestamp.UnixNano()) / float64(time.Second) + uptime := m.Uptime + cpu := m.CPUUsage + memAvail := float64(m.MemAvailable) + diskFree := float64(m.DiskFree) + netRx := float64(m.NetRxBytes) + netTx := float64(m.NetTxBytes) + goroutines := float64(m.Goroutines) + return []senml.Record{ + {BaseName: base, BaseTime: t, Name: "uptime", Unit: "s", Value: &uptime}, + {Name: "cpu_usage", Unit: "%", Value: &cpu}, + {Name: "mem_available", Unit: "By", Value: &memAvail}, + {Name: "disk_free", Unit: "By", Value: &diskFree}, + {Name: "net_rx", Unit: "By", Value: &netRx}, + {Name: "net_tx", Unit: "By", Value: &netTx}, + {Name: "goroutines", Value: &goroutines}, + } +} + +func (a *agent) selfHeartbeatPayload() ([]byte, error) { + metrics.Read(heapSamples) + + heapFree := uint64(0) + if len(heapSamples) > 0 { + heapFree = heapSamples[0].Value.Uint64() + } + + deviceCount := 0 + if a.devices != nil { + n, err := a.devices.Count() + if err != nil { + a.logger.Warn("failed to count devices for self-heartbeat", slog.Any("error", err)) + } else { + deviceCount = n + } + } + + svcType := "agent" + heartbeat := true + fwVersion := Version uptime := time.Since(startTime).Seconds() heapFreeValue := float64(heapFree) deviceCountValue := float64(deviceCount) @@ -930,7 +1018,7 @@ func (a *agent) selfTelemetry(ctx context.Context, topic string, interval time.D token := a.mqttClient.Publish(topic, qos, false, b) token.Wait() if err := token.Error(); err != nil { - a.logger.Warn("self-heartbeat publish failed", slog.Any("error", err)) + a.logger.Warn("self-telemetry publish failed", slog.Any("error", err)) } } From a962b1c1bce1f9ad6a69f8355c580bf09b4848c0 Mon Sep 17 00:00:00 2001 From: nyagamunene Date: Thu, 11 Jun 2026 17:58:03 +0300 Subject: [PATCH 5/7] fix topic Signed-off-by: nyagamunene --- service.go | 2 +- service_test.go | 16 ++++++++++++++++ 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/service.go b/service.go index 0c6bb104..db32843b 100644 --- a/service.go +++ b/service.go @@ -292,7 +292,7 @@ func New(ctx context.Context, mc paho.Client, cfg *Config, nc nodered.Client, lo cfg.DomainID, cfg.Channels.DataChan()) go ag.selfHeartbeat(ctx, selfTopic, cfg.Heartbeat.Interval, cfg.MQTT.QoS) - healthTopic := fmt.Sprintf("m/%s/c/%s/gateway/heartbeat", + healthTopic := fmt.Sprintf("m/%s/c/%s/gateway/health", cfg.DomainID, cfg.Channels.DataChan()) go ag.healthHeartbeat(ctx, healthTopic, cfg.Heartbeat.Interval, cfg.MQTT.QoS) diff --git a/service_test.go b/service_test.go index b21aaea6..562ab5fe 100644 --- a/service_test.go +++ b/service_test.go @@ -73,6 +73,8 @@ func newService(t *testing.T, cfg agent.Config, store cfgstore.Store, devices .. mqttClient.On("IsConnected").Maybe().Return(true) mqttClient.On("Publish", mqttTopic("data-channel", "gateway/heartbeat"), cfg.MQTT.QoS, mock.Anything, mock.Anything).Maybe().Return(hbToken) + mqttClient.On("Publish", mqttTopic("data-channel", "gateway/health"), + cfg.MQTT.QoS, mock.Anything, mock.Anything).Maybe().Return(hbToken) ctx, cancel := context.WithCancel(context.Background()) t.Cleanup(cancel) @@ -112,6 +114,12 @@ func TestSelfHeartbeatPublishesRichPayload(t *testing.T) { return richHeartbeatPayload(t, payload) })).Return(token).Once() + healthToken := agentmocks.NewMQTTToken(t) + healthToken.On("Wait").Maybe().Return(true) + healthToken.On("Error").Maybe().Return(error(nil)) + mqttClient.On("Publish", mqttTopic("data-channel", "gateway/health"), + cfg.MQTT.QoS, false, mock.Anything).Maybe().Return(healthToken) + ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -801,6 +809,8 @@ func TestBsValidCacheInvalidation(t *testing.T) { hbToken.On("Error").Maybe().Return(error(nil)) mqttClient.On("Publish", mqttTopic("data-channel", "gateway/heartbeat"), mock.Anything, mock.Anything, mock.Anything).Maybe().Return(hbToken) + mqttClient.On("Publish", mqttTopic("data-channel", "gateway/health"), + mock.Anything, mock.Anything, mock.Anything).Maybe().Return(hbToken) mqttClient.On("IsConnected").Maybe().Return(true) ctx, cancel := context.WithCancel(context.Background()) @@ -1296,6 +1306,12 @@ func TestPing(t *testing.T) { cfg.MQTT.QoS, false, mock.Anything, ).Return(startupToken).Once() + healthToken := agentmocks.NewMQTTToken(t) + healthToken.On("Wait").Maybe().Return(true) + healthToken.On("Error").Maybe().Return(error(nil)) + mqttClient.On("Publish", mqttTopic("data-channel", "gateway/health"), + cfg.MQTT.QoS, false, mock.Anything).Maybe().Return(healthToken) + ctx, cancel := context.WithCancel(context.Background()) t.Cleanup(cancel) From f01504dd14f20ce7dd4638cbefbfa227644375da Mon Sep 17 00:00:00 2001 From: nyagamunene Date: Thu, 11 Jun 2026 18:11:56 +0300 Subject: [PATCH 6/7] fix linter Signed-off-by: nyagamunene --- service.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/service.go b/service.go index db32843b..6ef1bb5a 100644 --- a/service.go +++ b/service.go @@ -75,6 +75,7 @@ const ( notFound = "not_found" senmlNameUptime = "uptime" + senmlBaseGW = "gw:" notAllowed = "not_allowed" ) @@ -90,7 +91,7 @@ var execAllowlist = map[string]bool{ "echo": true, "env": true, "false": true, "free": true, "hostname": true, "id": true, "ifconfig": true, "ip": true, "journalctl": true, "ls": true, "netstat": true, "ping": true, "printf": true, "ps": true, "pwd": true, - "ss": true, "systemctl": true, "true": true, "uname": true, "uptime": true, + "ss": true, "systemctl": true, "true": true, "uname": true, senmlNameUptime: true, "who": true, } @@ -258,7 +259,7 @@ type agent struct { otaCancel context.CancelFunc otaAborted atomic.Bool bootstrapCachePath string - latestHealth atomic.Pointer[health.Metrics] + latestHealth atomic.Pointer[health.Metrics] } // New returns agent service implementation. @@ -946,7 +947,7 @@ func (a *agent) healthHeartbeat(ctx context.Context, topic string, interval time } func healthToSenML(m health.Metrics) []senml.Record { - base := "gw:" + base := senmlBaseGW t := float64(m.Timestamp.UnixNano()) / float64(time.Second) uptime := m.Uptime cpu := m.CPUUsage @@ -1072,7 +1073,7 @@ func (a *agent) gatewayTelemetryPayload() []senml.Record { uptime := time.Since(startTime).Seconds() records := []senml.Record{ - {BaseName: "gw:", BaseTime: now, Name: "uptime", Unit: "s", Value: &uptime}, + {BaseName: senmlBaseGW, BaseTime: now, Name: "uptime", Unit: "s", Value: &uptime}, } if total, _, available, ok := readMemoryStats(); ok { @@ -1191,7 +1192,7 @@ func (a *agent) OTA(ctx context.Context, url, sha256hex string, size uint64) err now := float64(time.Now().UnixNano()) stateStr := strings.ToLower(state.String()) statusPack := []senml.Record{ - {BaseName: "gw:", BaseTime: now, Name: "ota_state", StringValue: &stateStr}, + {BaseName: senmlBaseGW, BaseTime: now, Name: "ota_state", StringValue: &stateStr}, {Name: "ota_progress", Unit: "%", Value: &progress}, } b, err := senml.EncodeRecords(statusPack) From 870b250e669b699a75991a115d51f5a768657a52 Mon Sep 17 00:00:00 2001 From: nyagamunene Date: Thu, 11 Jun 2026 20:17:23 +0300 Subject: [PATCH 7/7] fix linter Signed-off-by: nyagamunene --- service.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/service.go b/service.go index 6ef1bb5a..59a2e67b 100644 --- a/service.go +++ b/service.go @@ -259,7 +259,7 @@ type agent struct { otaCancel context.CancelFunc otaAborted atomic.Bool bootstrapCachePath string - latestHealth atomic.Pointer[health.Metrics] + latestHealth atomic.Pointer[health.Metrics] } // New returns agent service implementation.