Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,8 @@ Environment variables:
| `MG_AGENT_MQTT_RETAIN` | MQTT retain flag | `false` |
| `MG_AGENT_NODERED_URL` | Node-RED API URL | `http://localhost:1880/` |
| `MG_AGENT_HEARTBEAT_INTERVAL` | Expected heartbeat interval | `10s` |
| `MG_AGENT_TELEMETRY_ENABLED` | Enable periodic gateway telemetry publishing | `false` |
| `MG_AGENT_TELEMETRY_INTERVAL` | Periodic gateway telemetry interval | `60s` |
| `MG_AGENT_TERMINAL_SESSION_TIMEOUT` | Terminal session timeout | `60s` |
| `MG_AGENT_BOOTSTRAP_URL` | Bootstrap base URL | |
| `MG_AGENT_BOOTSTRAP_EXTERNAL_ID` | Bootstrap external ID | |
Expand Down
19 changes: 19 additions & 0 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ type config struct {
MqttCert string `env:"MG_AGENT_MQTT_CLIENT_CERT" envDefault:"client.cert"`
MqttPrivateKey string `env:"MG_AGENT_MQTT_CLIENT_KEY" envDefault:"client.key"`
HeartbeatInterval string `env:"MG_AGENT_HEARTBEAT_INTERVAL" envDefault:"10s"`
TelemetryEnabled string `env:"MG_AGENT_TELEMETRY_ENABLED" envDefault:"false"`
TelemetryInterval string `env:"MG_AGENT_TELEMETRY_INTERVAL" envDefault:"60s"`
TermSessionTimeout string `env:"MG_AGENT_TERMINAL_SESSION_TIMEOUT" envDefault:"60s"`
OTAEnabled string `env:"MG_AGENT_OTA_ENABLED" envDefault:"false"`
OTABinaryPath string `env:"MG_AGENT_OTA_BINARY_PATH" envDefault:"/usr/local/bin/agent"`
Expand All @@ -70,6 +72,7 @@ type config struct {
var (
errFailedToSetupMTLS = errors.New("Failed to set up mtls certs")
errFailedToConfigHeartbeat = errors.New("Failed to configure heartbeat")
errFailedToConfigTelemetry = errors.New("Failed to configure telemetry")
errFetchingBootstrapFailed = errors.New("Fetching bootstrap failed with error")
errInvalidRuntimeConfig = errors.New("Invalid runtime config")
)
Expand Down Expand Up @@ -244,6 +247,9 @@ func validateRuntimeConfig(cfg agent.Config) error {
if cfg.Heartbeat.Interval <= 0 {
missing = append(missing, "heartbeat.interval")
}
if cfg.Telemetry.Enabled && cfg.Telemetry.Interval <= 0 {
missing = append(missing, "telemetry.interval")
}
if len(missing) > 0 {
return errors.New(fmt.Sprintf("%s: missing required runtime fields: %s", errInvalidRuntimeConfig, strings.Join(missing, ", ")))
}
Expand All @@ -266,6 +272,18 @@ func loadEnvConfig(cfg config) (agent.Config, error) {
ch := agent.HeartbeatConfig{
Interval: interval,
}
telemetryInterval, err := time.ParseDuration(cfg.TelemetryInterval)
if err != nil {
return agent.Config{}, errors.Wrap(errFailedToConfigTelemetry, err)
}
telemetryEnabled, err := strconv.ParseBool(cfg.TelemetryEnabled)
if err != nil {
telemetryEnabled = false
}
tcTelemetry := agent.TelemetryConfig{
Enabled: telemetryEnabled,
Interval: telemetryInterval,
}
termSessionTimeout, err := time.ParseDuration(cfg.TermSessionTimeout)
if err != nil {
return agent.Config{}, err
Expand Down Expand Up @@ -318,6 +336,7 @@ func loadEnvConfig(cfg config) (agent.Config, error) {
}

c := agent.NewConfig(sc, agent.ChanConfig{}, nc, lc, mc, ch, ct, oc)
c.Telemetry = tcTelemetry
return c, nil
}

Expand Down
40 changes: 40 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ type HeartbeatConfig struct {
Interval time.Duration
}

type TelemetryConfig struct {
Enabled bool `json:"enabled"`
Interval time.Duration `json:"interval"`
}

type TerminalConfig struct {
SessionTimeout time.Duration `json:"session_timeout"`
}
Expand All @@ -91,6 +96,7 @@ type Config struct {
Server ServerConfig `json:"server"`
Terminal TerminalConfig `json:"terminal"`
Heartbeat HeartbeatConfig `json:"heartbeat"`
Telemetry TelemetryConfig `json:"telemetry"`
Channels ChanConfig `json:"channels"`
NodeRed NodeRedConfig `json:"nodered"`
Log LogConfig `json:"log"`
Expand Down Expand Up @@ -139,6 +145,40 @@ func (d *HeartbeatConfig) UnmarshalJSON(b []byte) error {
}
}

// UnmarshalJSON parses the telemetry interval duration when present.
func (d *TelemetryConfig) UnmarshalJSON(b []byte) error {
var v map[string]any
if err := json.Unmarshal(b, &v); err != nil {
return err
}
if enabled, ok := v["enabled"]; ok {
switch value := enabled.(type) {
case bool:
d.Enabled = value
default:
return errors.New("invalid enabled")
}
}
interval, ok := v["interval"]
if !ok {
return nil
}
switch value := interval.(type) {
case float64:
d.Interval = time.Duration(value)
return nil
case string:
var err error
d.Interval, err = time.ParseDuration(value)
if err != nil {
return err
}
return nil
default:
return errors.New("invalid duration")
}
}

// UnmarshalJSON parses the duration from JSON.
func (d *TerminalConfig) UnmarshalJSON(b []byte) error {
var v map[string]any
Expand Down
4 changes: 4 additions & 0 deletions docker/.env
Original file line number Diff line number Diff line change
Expand Up @@ -37,5 +37,9 @@ MG_AGENT_BOOTSTRAP_SKIP_TLS=false
## Device Manager
MG_AGENT_DEVICE_DB_PATH=/var/lib/agent/devices.db

# Telemetry
MG_AGENT_TELEMETRY_ENABLED=false
MG_AGENT_TELEMETRY_INTERVAL=60s

## Config Store
MG_AGENT_CONFIG_PATH=/var/lib/agent/agent-config.json
2 changes: 2 additions & 0 deletions docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ services:
MG_AGENT_OTA_BINARY_PATH: ${MG_AGENT_OTA_BINARY_PATH}
MG_AGENT_OTA_DOWNLOAD_DIR: ${MG_AGENT_OTA_DOWNLOAD_DIR}
MG_AGENT_DEVICE_DB_PATH: ${MG_AGENT_DEVICE_DB_PATH}
MG_AGENT_TELEMETRY_ENABLED: ${MG_AGENT_TELEMETRY_ENABLED}
MG_AGENT_TELEMETRY_INTERVAL: ${MG_AGENT_TELEMETRY_INTERVAL}
MG_AGENT_CONFIG_PATH: ${MG_AGENT_CONFIG_PATH}
volumes:
- /etc/ssl/certs/ca-certificates.crt:/etc/ssl/certs/ca-certificates.crt:ro
Expand Down
4 changes: 4 additions & 0 deletions service.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,10 @@ func New(ctx context.Context, mc paho.Client, cfg *Config, nc nodered.Client, lo
topic := fmt.Sprintf("m/%s/c/%s/gateway/heartbeat",
cfg.DomainID, cfg.Channels.DataChan())
go ag.selfHeartbeat(ctx, topic, cfg.Heartbeat.Interval, cfg.MQTT.QoS)
if cfg.Telemetry.Enabled {
telemetryTopic := fmt.Sprintf("m/%s/c/%s/msg", cfg.DomainID, cfg.Channels.DataChan())
go ag.periodicTelemetry(ctx, telemetryTopic, cfg.Telemetry.Interval, cfg.MQTT.QoS)
}

return ag, nil
}
Expand Down
108 changes: 86 additions & 22 deletions service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,24 +111,38 @@ func expectMQTTPublish(t *testing.T, mqttClient *agentmocks.MQTTClient, topic st
return mqttClient.On("Publish", topic, byte(1), true, mock.Anything).Return(token).Once()
}

func TestSelfHeartbeatPublishesRichPayload(t *testing.T) {
func TestPeriodicTelemetryPublishesPayload(t *testing.T) {
cfg := testConfig()
cfg.DomainID = domainID
cfg.Telemetry = agent.TelemetryConfig{Enabled: true, Interval: time.Hour}

mqttClient := agentmocks.NewMQTTClient(t)
nodeRed := nrmocks.NewClient(t)

mqttClient.On("IsConnected").Return(true).Maybe()
mqttClient.On("IsConnected").Maybe().Return(true)

selfHbToken := agentmocks.NewMQTTToken(t)
selfHbToken.On("Wait").Maybe().Return(true)
selfHbToken.On("Error").Maybe().Return(error(nil))
mqttClient.On("Publish", mqttTopic("data-channel", "gateway/heartbeat"),
mock.Anything, mock.Anything, mock.Anything).Maybe().Return(selfHbToken)

hbToken := agentmocks.NewMQTTToken(t)
hbToken.On("Wait").Maybe().Return(true)
hbToken.On("Error").Maybe().Return(error(nil))
mqttClient.On("Publish", mqttTopic("ctrl-channel", "services/agent/heartbeat"),
mock.Anything, mock.Anything, mock.Anything).Maybe().Return(hbToken)

published := make(chan struct{})
token := agentmocks.NewMQTTToken(t)
token.On("Wait").Return(true).Once()
token.On("Error").Run(func(_ mock.Arguments) {
telemetryToken := agentmocks.NewMQTTToken(t)
telemetryToken.On("Wait").Return(true).Once()
telemetryToken.On("Error").Run(func(_ mock.Arguments) {
close(published)
}).Return(error(nil)).Once()

mqttClient.On("Publish", mqttTopic("data-channel", "gateway/heartbeat"), cfg.MQTT.QoS, false, mock.MatchedBy(func(payload interface{}) bool {
return richHeartbeatPayload(t, payload)
})).Return(token).Once()
mqttClient.On("Publish", mqttTopic("data-channel", "msg"), byte(1), false, mock.MatchedBy(func(payload interface{}) bool {
return telemetryPayload(t, payload)
})).Return(telemetryToken).Once()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand All @@ -139,11 +153,11 @@ func TestSelfHeartbeatPublishesRichPayload(t *testing.T) {
select {
case <-published:
case <-time.After(time.Second):
t.Fatal("self-heartbeat was not published")
t.Fatal("telemetry was not published")
}
}

func richHeartbeatPayload(t *testing.T, payload interface{}) bool {
func telemetryPayload(t *testing.T, payload interface{}) bool {
t.Helper()

var b []byte
Expand All @@ -162,24 +176,27 @@ func richHeartbeatPayload(t *testing.T, payload interface{}) bool {
}

records := make(map[string]senml.Record, len(pack.Records))
hasNetworkStats := false
for _, record := range pack.Records {
records[record.Name] = record
if strings.HasPrefix(record.Name, "net_") {
hasNetworkStats = true
}
}

return records["service_type"].StringValue != nil &&
*records["service_type"].StringValue == "agent" &&
records["heartbeat"].BoolValue != nil &&
*records["heartbeat"].BoolValue &&
records["fw_version"].StringValue != nil &&
*records["fw_version"].StringValue == agent.Version &&
return records["cpu_usage"].Value != nil &&
records["cpu_usage"].Unit == "%" &&
records["memory_used"].Value != nil &&
records["memory_used"].Unit == "By" &&
records["memory_free"].Value != nil &&
records["memory_free"].Unit == "By" &&
records["disk_used"].Value != nil &&
records["disk_used"].Unit == "By" &&
records["disk_free"].Value != nil &&
records["disk_free"].Unit == "By" &&
records["uptime"].Value != nil &&
records["uptime"].Unit == "s" &&
records["heap_free"].Value != nil &&
records["heap_free"].Unit == "By" &&
records["devices"].Value != nil &&
records["devices"].Unit == "count" &&
records["connected"].BoolValue != nil &&
*records["connected"].BoolValue
hasNetworkStats
}

func TestChannelConfig(t *testing.T) {
Expand Down Expand Up @@ -272,6 +289,53 @@ func TestDurationConfigUnmarshalJSON(t *testing.T) {
}
}

func TestTelemetryConfigUnmarshalJSON(t *testing.T) {
cases := []struct {
desc string
body string
enabled bool
interval time.Duration
expectErr bool
}{
{
desc: "parse enabled telemetry duration",
body: `{"enabled":true,"interval":"30s"}`,
enabled: true,
interval: 30 * time.Second,
},
{
desc: "allow disabled telemetry without interval",
body: `{"enabled":false}`,
enabled: false,
interval: 0,
},
{
desc: "reject invalid enabled type",
body: `{"enabled":"yes","interval":"30s"}`,
expectErr: true,
},
{
desc: "reject invalid duration",
body: `{"enabled":true,"interval":"soon"}`,
expectErr: true,
},
}

for _, tc := range cases {
t.Run(tc.desc, func(t *testing.T) {
var cfg agent.TelemetryConfig
err := json.Unmarshal([]byte(tc.body), &cfg)
if tc.expectErr {
require.Error(t, err)
return
}
require.NoError(t, err)
assert.Equal(t, tc.enabled, cfg.Enabled)
assert.Equal(t, tc.interval, cfg.Interval)
})
}
}

func TestHeartbeat(t *testing.T) {
h := agent.NewHeartbeat("nodered", "service", time.Hour)
info := h.Info()
Expand Down
Loading