Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 140 additions & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
# CLAUDE.md

This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.

## Project Overview

Consul ESM (External Service Monitor) is a daemon that monitors health checks for external nodes registered in Consul's catalog. It runs alongside Consul to perform health checks for nodes that don't run Consul agents, and can optionally update network coordinates for these nodes.

**Language:** Go
**Requires:** Consul 1.4.1+

## Common Development Commands

### Building
- `make dev` - Build and install binary locally to `$GOPATH/bin` and `bin/consul-esm`
- `make build` - Build Linux AMD64 binary to `dist/linux/amd64/`
- `make docker` - Build Docker image

### Testing
- `make test` - Run all tests with 60s timeout
- `make test-race` - Run tests with race detector enabled
- `go test ./... -run TestFunctionName` - Run specific test by name
- `go test -v ./...` - Run tests with verbose output

### Running Locally
```bash
consul-esm -config-file=/path/to/config.hcl
# or
consul-esm -config-dir=/path/to/config/dir
```

## Architecture

### Core Components

**Agent (`agent.go`)**
- Entry point for ESM functionality
- Handles registration with Consul (either via agent API or catalog API in agentless mode)
- Manages leader election and coordination
- Spawns goroutines for registration checks, TTL/session management, node watching, and leader loop
- Two operating modes:
- **Agent mode** (default): Registers via Consul agent API, uses TTL checks
- **Agentless mode** (`enable_agentless=true`): Registers via catalog API, uses session-based health checks

**Leader Election (`leader.go`)**
- Uses Consul KV lock at `consul-esm/leader` key
- Leader watches external nodes and distributes them across healthy ESM instances
- Node assignment stored in KV at `consul-esm/agents/<service-id>`
- Balances nodes round-robin across all healthy ESM instances with same service name/tag
- Nodes with `"external-probe": "true"` metadata are pinged for coordinates

**CheckRunner (`check.go`)**
- Executes HTTP and TCP health checks for assigned external nodes
- Supports threshold-based anti-flapping (`passing_threshold`, `critical_threshold`)
- Uses Consul's `CheckHTTP` and `CheckTCP` implementations
- Updates check status via Consul transactions (CAS operations)
- **Agentless mode optimization**: Batches check updates to reduce HTTP connections

**Batcher (`batcher.go`)**
- Used in agentless mode to batch check updates
- Collects updates and flushes at regular intervals or when batch size limit reached
- Reduces server load by combining multiple check updates into single transactions
- Handles retry logic for stale index errors

**Coordinate Updates (`coordinate.go`)**
- Pings external nodes marked with `"external-probe": "true"`
- Updates network coordinates in Consul for RTT calculations
- Supports UDP (default) or ICMP socket pings
- Also manages `externalNodeHealth` check based on ping success/failure

### Data Flow

1. **Leader Election**: One ESM instance acquires leader lock
2. **Node Distribution**: Leader watches external nodes and assigns them to ESM instances via KV
3. **Node Watching**: Each ESM instance watches its assigned node list at `consul-esm/agents/<service-id>`
4. **Health Checks**: ESM fetches health checks for assigned nodes and runs HTTP/TCP checks
5. **Status Updates**: Check results update Consul catalog (batched in agentless mode)
6. **Coordinate Updates**: ESM pings nodes and updates coordinates (if enabled)

### Key Files

- `main.go` - CLI parsing, logging setup, agent initialization
- `agent.go` - Agent lifecycle, registration, goroutine coordination
- `leader.go` - Leader election, node assignment distribution
- `check.go` - Health check execution and status updates
- `coordinate.go` - Network coordinate pinging and updates
- `config.go` - Configuration parsing (HCL/JSON)
- `batcher.go` - Check update batching for agentless mode

## Important Concepts

### External Nodes
External nodes are identified by node metadata `"external-node": "true"`. These are nodes registered directly in the Consul catalog (not running Consul agents) that ESM monitors.

### Node Probing
Nodes with `"external-probe": "true"` metadata are actively pinged by ESM to:
- Update network coordinates for RTT calculations
- Maintain an `externalNodeHealth` check (similar to Consul's `serfHealth`)

### Agentless Mode
When `enable_agentless=true`:
- ESM registers as a catalog service instead of agent service
- Uses session-based health checks instead of TTL checks
- Batches check updates to reduce HTTP connections to Consul servers
- More efficient for large-scale deployments

### Leader vs Follower Behavior
- **Leader**: Distributes external nodes across all healthy ESM instances
- **Follower**: Monitors assigned nodes and runs health checks

### Check Hash
Checks are uniquely identified by hash: `{node}/{service-id}/{check-id}` or `{node}/{check-id}` for node checks.

### Transaction Limits
Consul supports max 64 operations per transaction (`maxTxnOps`). Batching logic handles splitting larger batches.

## Configuration Notes

- Config files can be HCL or JSON format
- Multiple config files/directories can be specified
- Environment variables supported: `CONSUL_HTTP_ADDR`, `CONSUL_HTTP_TOKEN`, `CONSUL_ENABLEAGENTLESS`, `CONSUL_PARTITION`
- Default service name: `consul-esm`
- Default KV path: `consul-esm/`

## Testing Patterns

- Tests use `testutil.TestServer` from Consul for test Consul servers
- `agent_test.go` - Agent lifecycle, registration, leader election tests
- `check_test.go` - Health check runner tests
- `leader_test.go` - Node distribution and balancing tests
- `agentless_optimization_test.go` - Agentless mode batching tests

## Metrics

ESM exposes Prometheus metrics at `/metrics` endpoint (when `client_address` is configured):
- `esm_agent_isLeader` - Leader status (1 or 0)
- `esm_nodes_monitored` - Count of monitored external nodes
- `esm_services_monitored` - Count of monitored external services
- `esm_checks_*` - Check counts and health
- `esm_agents_healthy` - Count of healthy ESM instances (leader only)
17 changes: 11 additions & 6 deletions agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ func (a *Agent) Run() error {
}()

a.ready <- struct{}{} // used for testing
defer func() { // be sure to drain it between calls
defer func() { // be sure to drain it between calls
select {
case <-a.ready:
default:
Expand Down Expand Up @@ -766,7 +766,10 @@ func (a *Agent) watchNodeList() {
pingNodes[node] = true
}

nodes, _, err := a.client.Catalog().Nodes(&api.QueryOptions{NodeMeta: a.config.NodeMeta})
nodes, _, err := a.client.Catalog().Nodes(&api.QueryOptions{
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is still a partial solution. Instead of passing the NodeMeta, we should actually pass the pingNodes to the filter. Otherwise, we are still retrieving all the Nodes !
But, that would create an issue at scale. Lets say 200 number of Nodes.
So, the real fix would be Consul should expose a dedicated API to retrieve all the nodes corresponds to that specific ESM instance, as Consul Server can read from the KV store and filter only the relevant nodes. cc @shashankNandigama

This looks good for now.

Copy link
Copy Markdown
Author

@aliciay64 aliciay64 Apr 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool, if you can implement this as a new API on the consul side, that would be more straightforward, also saves the retrieval from KV store.

One concern is that this new API will still put more load on consul leader, so we want to enable stale reads in ESM when switching to that API. I'll make the stale reads on nodes a configurable option

Can you also link the Issue/PR for that new Consul API here? Thanks!

NodeMeta: a.config.NodeMeta,
AllowStale: a.config.StaleReadNodes,
})
if err != nil {
a.logger.Warn("Error querying for node list", "error", err)
continue
Expand Down Expand Up @@ -874,11 +877,13 @@ func (a *Agent) getHealthChecks(waitIndex uint64, nodes map[string]bool) (api.He
// the typical ALB 60s idle timeout, ensuring Consul responds before the LB
// can kill the connection.
opts := &api.QueryOptions{
NodeMeta: a.config.NodeMeta,
WaitIndex: waitIndex,
Namespace: a.getNamespaceWildcard(),
WaitTime: 45 * time.Second,
NodeMeta: a.config.NodeMeta,
WaitIndex: waitIndex,
Namespace: a.getNamespaceWildcard(),
WaitTime: 45 * time.Second,
AllowStale: a.config.StaleReadNodes,
}

opts = opts.WithContext(ctx)
a.HasPartition(func(partition string) {
opts.Partition = partition
Expand Down
82 changes: 82 additions & 0 deletions agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1057,6 +1057,88 @@ func TestAgent_getHealthChecks_filtersCorrectly(t *testing.T) {
assert.Contains(t, checkIDs, "svc-check-2")
})

t.Run("uses-stale-reads-when-configured", func(t *testing.T) {
listener, err := net.Listen("tcp", ":0")
require.NoError(t, err)
port := listener.Addr().(*net.TCPAddr).Port
addr := fmt.Sprintf("127.0.0.1:%d", port)

var receivedStale bool

checks := api.HealthChecks{
{Node: "node1", CheckID: "check-1", Name: "check-1", Status: api.HealthPassing},
}
checksJSON, _ := json.Marshal(checks)

ts := httptest.NewUnstartedServer(http.HandlerFunc(
func(w http.ResponseWriter, r *http.Request) {
switch r.URL.EscapedPath() {
case "/v1/status/leader":
fmt.Fprint(w, `"`+addr+`"`)
case "/v1/agent/self":
fmt.Fprint(w, testAgentSelfOSS())
case "/v1/health/state/any":
receivedStale = r.URL.Query().Has("stale")
fmt.Fprint(w, string(checksJSON))
default:
}
}))
ts.Listener = listener
ts.Start()
defer ts.Close()

agent := testAgent(t, func(c *Config) {
c.HTTPAddr = addr
c.StaleReadNodes = true
})
defer agent.Shutdown()

ourNodes := map[string]bool{"node1": true}
_, _ = agent.getHealthChecks(0, ourNodes)

assert.True(t, receivedStale, "Should enable stale reads when StaleReadNodes=true")
})

t.Run("no-stale-reads-when-disabled", func(t *testing.T) {
listener, err := net.Listen("tcp", ":0")
require.NoError(t, err)
port := listener.Addr().(*net.TCPAddr).Port
addr := fmt.Sprintf("127.0.0.1:%d", port)

var receivedStale bool

checks := api.HealthChecks{}
checksJSON, _ := json.Marshal(checks)

ts := httptest.NewUnstartedServer(http.HandlerFunc(
func(w http.ResponseWriter, r *http.Request) {
switch r.URL.EscapedPath() {
case "/v1/status/leader":
fmt.Fprint(w, `"`+addr+`"`)
case "/v1/agent/self":
fmt.Fprint(w, testAgentSelfOSS())
case "/v1/health/state/any":
receivedStale = r.URL.Query().Has("stale")
fmt.Fprint(w, string(checksJSON))
default:
}
}))
ts.Listener = listener
ts.Start()
defer ts.Close()

agent := testAgent(t, func(c *Config) {
c.HTTPAddr = addr
c.StaleReadNodes = false
})
defer agent.Shutdown()

ourNodes := map[string]bool{"node1": true}
_, _ = agent.getHealthChecks(0, ourNodes)

assert.False(t, receivedStale, "Should not use stale reads when StaleReadNodes=false")
})

t.Run("returns-empty-on-api-error", func(t *testing.T) {
listener, err := net.Listen("tcp", ":0")
require.NoError(t, err)
Expand Down
2 changes: 2 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ type Config struct {
PingType string

DisableCoordinateUpdates bool
StaleReadNodes bool

Telemetry lib.TelemetryConfig

Expand Down Expand Up @@ -144,6 +145,7 @@ func DefaultConfig() (*Config, error) {
NodeReconnectTimeout: 72 * time.Hour,
PingType: PingTypeUDP,
DisableCoordinateUpdates: false,
StaleReadNodes: false,
Partition: "",
LogFile: "",
LogRotateBytes: 0,
Expand Down
39 changes: 39 additions & 0 deletions config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/armon/go-metrics/prometheus"
"github.com/hashicorp/consul/lib"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestDecodeMergeConfig(t *testing.T) {
Expand Down Expand Up @@ -373,6 +374,44 @@ func TestPartition(t *testing.T) {
}
}

func TestStaleReadNodesConfig(t *testing.T) {
cases := []struct {
name string
config string
expected bool
}{
{
name: "defaults to false when not set",
config: "",
expected: false,
},
{
name: "can be disabled via config",
config: `stale_read_nodes = false`,
expected: false,
},
{
name: "can be explicitly enabled via config",
config: `stale_read_nodes = true`,
expected: true,
},
}

for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
result, err := DefaultConfig()
require.NoError(t, err)

humanConfig, err := DecodeConfig(bytes.NewBufferString(tc.config))
require.NoError(t, err)

MergeConfig(result, humanConfig)

assert.Equal(t, tc.expected, result.StaleReadNodes)
})
}
}

func stringPointer(s string) *string {
if len(s) == 0 {
return nil
Expand Down