diff --git a/.github/workflows/e2e.yml b/.github/workflows/e2e.yml new file mode 100644 index 0000000..d671b36 --- /dev/null +++ b/.github/workflows/e2e.yml @@ -0,0 +1,160 @@ +name: E2E Smoke Tests + +on: + pull_request: + branches: [master] + push: + branches: [master] + workflow_dispatch: + inputs: + backend: + description: 'Backend to test against' + required: false + default: 'thanos' + type: choice + options: + - thanos + - prometheus + +jobs: + e2e: + runs-on: ubuntu-latest + timeout-minutes: 20 + strategy: + fail-fast: false + matrix: + backend: [thanos, prometheus] + steps: + - uses: actions/checkout@v4 + + - name: Set up JDK 17 + uses: actions/setup-java@v4 + with: + distribution: temurin + java-version: 17 + cache: maven + + - name: Install sshpass + run: sudo apt-get install -y sshpass + + - name: Build plugin + run: mvn clean install -DskipTests + + - name: Deploy KAR + run: cp assembly/kar/target/opennms-cortex-tss-plugin.kar e2e/opennms-overlay/deploy/ + + - name: Start ${{ matrix.backend }} stack + working-directory: e2e + run: | + docker compose --profile ${{ matrix.backend }} up -d + echo "Waiting for OpenNMS..." + for i in $(seq 1 60); do + STATUS=$(curl -s -o /dev/null -w '%{http_code}' -u admin:admin http://localhost:8980/opennms/rest/info 2>/dev/null || echo "000") + if [ "$STATUS" = "200" ]; then + echo "OpenNMS is up after ~${i}0s" + break + fi + sleep 10 + done + + - name: Verify all containers are running + run: | + echo "=== Container status ===" + docker ps -a --format 'table {{.Names}}\t{{.Status}}' + echo "" + # Fail fast if any e2e container exited + EXITED=$(docker ps -a --filter "status=exited" --format '{{.Names}}' | grep e2e || true) + if [ -n "$EXITED" ]; then + echo "ERROR: Containers have exited: $EXITED" + for C in $EXITED; do + echo "=== Logs: $C ===" + docker logs "$C" 2>&1 | tail -20 + done + exit 1 + fi + echo "All containers running" + + - name: Install Cortex plugin feature + run: | + ssh-keygen -R "[localhost]:8101" 2>/dev/null || true + # Wait for Karaf SSH to be ready, then install the feature + for attempt in $(seq 1 12); do + sshpass -p admin ssh -o StrictHostKeyChecking=no -o LogLevel=ERROR -p 8101 admin@localhost \ + "feature:install opennms-plugins-cortex-tss" 2>&1 && break + sleep 5 + done + # Wait for the feature to fully start (KAR extraction + bundle activation) + echo "Waiting for health check to pass..." + for attempt in $(seq 1 24); do + HEALTH=$(sshpass -p admin ssh -o StrictHostKeyChecking=no -o LogLevel=ERROR -p 8101 admin@localhost \ + "opennms:health-check" 2>&1 || true) + echo "$HEALTH" + if echo "$HEALTH" | grep -q "Everything is awesome"; then + echo "Health check passed on attempt $attempt" + exit 0 + fi + sleep 5 + done + echo "Health check did not pass within 2 minutes" + exit 1 + + - name: Wait for metrics + run: | + echo "Waiting for metrics to flow (up to 5 minutes)..." + for i in $(seq 1 60); do + COUNT=$(curl -s http://localhost:9090/api/v1/label/__name__/values 2>/dev/null \ + | python3 -c "import sys,json; print(len(json.load(sys.stdin)['data']))" 2>/dev/null || echo "0") + if [ "$COUNT" -gt "10" ]; then + echo "Got $COUNT metrics after ~${i}x5s" + exit 0 + fi + sleep 5 + done + echo "ERROR: No metrics after 5 minutes" + exit 1 + + - name: Run smoke tests + working-directory: e2e + run: bash smoke-test.sh --backend ${{ matrix.backend }} + + - name: Collect logs on failure + if: failure() + working-directory: e2e + run: | + echo "=== All containers (including exited) ===" + docker ps -a --format 'table {{.Names}}\t{{.Status}}\t{{.Ports}}' + echo "" + + # Show logs for ALL e2e containers, not just opennms + for CONTAINER in $(docker ps -a --format '{{.Names}}' | grep -E "e2e-"); do + echo "=== Logs: $CONTAINER ===" + docker logs "$CONTAINER" 2>&1 | tail -50 + echo "" + done + + OPENNMS=$(docker ps -a --format '{{.Names}}' | grep opennms | head -1) + if [ -n "$OPENNMS" ]; then + echo "=== Karaf log ===" + docker exec "$OPENNMS" cat /opt/opennms/logs/karaf.log 2>/dev/null | tail -100 + fi + + - name: Tear down + if: always() + working-directory: e2e + run: docker compose --profile ${{ matrix.backend }} down -v 2>/dev/null || true + + unit-tests: + runs-on: ubuntu-latest + timeout-minutes: 10 + steps: + - uses: actions/checkout@v4 + + - name: Set up JDK 17 + uses: actions/setup-java@v4 + with: + distribution: temurin + java-version: 17 + cache: maven + + - name: Run unit tests + run: mvn clean install diff --git a/.gitignore b/.gitignore index c59975e..d4c444e 100644 --- a/.gitignore +++ b/.gitignore @@ -30,3 +30,9 @@ java/bin/ *.swp *.iml +# E2E test artifacts +*.kar +e2e/opennms-overlay/etc/org.opennms.plugins.tss.cortex.cfg + +# Tool workspace artifacts +docs/superpowers/ diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 0000000..0f8a26f --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1,74 @@ +# CLAUDE.md + +## Project Overview + +Cortex TSS Plugin for OpenNMS — a `TimeSeriesStorage` implementation that stores metrics in Cortex/Prometheus-compatible backends via remote write protocol. Deployed as a KAR file into OpenNMS Karaf. + +- **Package**: `org.opennms.timeseries.cortex` +- **Core classes**: `CortexTSS` (implements `TimeSeriesStorage`), `CortexTSSConfig`, `ResultMapper` +- **Build**: `mvn clean install` (standard Maven, Java 17) +- **Unit tests**: `mvn test` — 14 tests (CortexTSSTest + ResultMapperTest) +- **KAR output**: `assembly/kar/target/opennms-cortex-tss-plugin.kar` +- **License**: AGPL v3 — all Java files MUST have the license header + +## E2E Test Harness + +Located in `e2e/`. Uses docker-compose with Prometheus or Thanos backends. + +### One-command E2E (preferred) +```bash +# Builds plugin, deploys KAR, starts stack, installs feature, waits for data, runs 45 tests, tears down: +./e2e/run-e2e.sh --backend thanos + +# Skip rebuild if KAR already exists: +./e2e/run-e2e.sh --backend thanos --no-build + +# Keep stack running after tests (for debugging): +./e2e/run-e2e.sh --backend thanos --no-teardown +``` + +### Manual steps (if needed) +```bash +cd e2e +docker-compose --profile thanos up -d +# (wait for OpenNMS + install plugin feature + wait for data) +./smoke-test.sh --backend thanos +``` + +### CI +GitHub Actions runs E2E on every PR against both Prometheus and Thanos backends. See `.github/workflows/e2e.yml`. + +### Critical Rules for E2E Infrastructure + +1. **Pin all image versions explicitly.** Never use `:latest` tags. Never reference `localhost/` images. Every image in docker-compose.yml must use a specific released version tag (e.g., `opennms/horizon:35.0.4`, `thanosio/thanos:v0.35.1`). + +2. **Develop and test against released OpenNMS versions only.** Never develop against SNAPSHOT builds. The E2E harness must pass against the current stable release. If a feature requires unreleased OpenNMS behavior, the test must be marked as `skip` with a comment noting the required version. + +3. **Tests must assert observable behavior.** Test assertions must check API responses, config file contents, metric data, HTTP status codes, or feature status. Never grep log files for specific messages — log output is an implementation detail that varies by version and log configuration. + +4. **Collection intervals must be 30 seconds for testing.** Override the default 300s interval in overlay configs. Waiting 5 minutes per collection cycle during E2E is unacceptable. + +5. **KAR must be pre-built and placed in `e2e/opennms-overlay/deploy/`.** The E2E harness does not build the plugin — it deploys a pre-built KAR. After `mvn install`, copy the KAR: + ```bash + cp assembly/kar/target/opennms-cortex-tss-plugin.kar e2e/opennms-overlay/deploy/ + ``` + +6. **The Cortex plugin feature must be explicitly installed** after OpenNMS starts. The KAR auto-deploys and registers the feature repo, but the feature itself needs `feature:install opennms-plugins-cortex-tss` via Karaf SSH (port 8101, admin/admin). + +### Docker Compose Gotchas (CI vs Local) + +The E2E harness runs on both podman (macOS local) and Docker (GitHub Actions CI). Key differences: + +- **Volume permissions**: Docker named volumes are root-owned. Images running as non-root (e.g., Thanos runs as uid 1001) will get `permission denied`. Fix: `user: "0:0"` in docker-compose.yml for affected services. +- **Container naming**: podman-compose uses `_` separators (`e2e_opennms_1`), Docker Compose v2 uses `-` separators (`e2e-opennms-1`). Scripts must handle both: `grep -E "[_-]opennms"`. +- **`set -e` + `grep -c`**: `grep -c` returns exit code 1 on zero matches, which kills `set -e` scripts. Use `|| true` on grep commands, or avoid `set -e`. +- **Matrix `fail-fast`**: GitHub Actions matrix defaults to `fail-fast: true`, canceling sibling jobs on first failure. Always set `fail-fast: false` for independent E2E profiles. +- **Always collect ALL container logs on failure** — not just OpenNMS. A crashed sidecar (thanos-receive, postgres) can cause misleading symptoms (DNS failures, connection refused). + +## Key Conventions + +- Config PID: `org.opennms.plugins.tss.cortex` — properties go in `.cfg` files +- Blueprint XML wires the OSGi service — constructor args must match `CortexTSSConfig` +- Wire protocol: Prometheus remote write (protobuf + Snappy) for writes; Prometheus HTTP API for reads +- The pre-existing `FIXME: Data loss` in `CortexTSS.java` is an upstream issue — do not remove or "fix" it without addressing the actual retry/backpressure problem +- Never remove code outside the scope of the current task. If something is brittle, fix it — don't delete it. diff --git a/MERMAID.md b/MERMAID.md new file mode 100644 index 0000000..9ee1b7e --- /dev/null +++ b/MERMAID.md @@ -0,0 +1,429 @@ +# Cortex TSS Plugin - System Architecture + +> Visual architecture of the OpenNMS Cortex TSS Plugin. +> See [PLUGIN-ARCHITECTURE.md](PLUGIN-ARCHITECTURE.md) for detailed documentation. + +--- + +## 1. High-Level Overview + +How the plugin fits between OpenNMS and a Prometheus-compatible backend. + +```mermaid +graph LR + subgraph OpenNMS["OpenNMS Horizon"] + COLL["Collectors
(Collectd, Pollerd,
Telemetryd)"] + REST["REST APIs
(/measurements,
/resources)"] + end + + subgraph Plugin["Cortex TSS Plugin"] + TSS["CortexTSS
TimeSeriesStorage"] + end + + subgraph Backend["Prometheus-Compatible Backend"] + PROM["Prometheus"] + THANOS["Thanos"] + CORTEX["Cortex / Mimir"] + end + + COLL -->|"samples"| TSS + REST -->|"queries"| TSS + TSS -->|"Protobuf + Snappy
(remote write)"| PROM + TSS -->|"Protobuf + Snappy
(remote write)"| THANOS + TSS -->|"Protobuf + Snappy
(remote write)"| CORTEX + PROM -->|"PromQL JSON"| TSS + THANOS -->|"PromQL JSON"| TSS + CORTEX -->|"PromQL JSON"| TSS + + classDef core fill:#e8f4fd,stroke:#2196F3,color:#000 + classDef plugin fill:#fff3e0,stroke:#FF9800,color:#000 + classDef backend fill:#e8f5e9,stroke:#4CAF50,color:#000 + + class COLL,REST core + class TSS plugin + class PROM,THANOS,CORTEX backend +``` + +--- + +## 2. Write Path + +How a metric sample flows from OpenNMS collectors to the backend. + +```mermaid +sequenceDiagram + participant C as Collectors + participant TSM as TimeseriesStorageManager + participant TSS as CortexTSS.store() + participant B as Bulkhead + participant HTTP as OkHttp + participant BE as Backend + + C->>TSM: List + TSM->>TSS: store(samples) + + Note over TSS: Filter NaN values + Note over TSS: Sort by timestamp + Note over TSS: Sanitize metric + label names + Note over TSS: Sort labels lexicographically + + TSS->>TSS: persistExternalTags() → KeyValueStore + + Note over TSS: Build Protobuf WriteRequest + Note over TSS: Snappy compress + + TSS->>B: executeCompletionStage() + B->>HTTP: async POST + + HTTP->>BE: POST /api/v1/write
Content-Type: x-protobuf
Content-Encoding: snappy
X-Scope-OrgID: {orgId} + + alt Success (HTTP 2xx) + BE-->>HTTP: 200 OK + HTTP-->>TSS: samplesWritten.mark() + else Failure + BE-->>HTTP: error / timeout + HTTP-->>TSS: samplesLost.mark() + Note over TSS: Log error, continue
(non-blocking) + end +``` + +--- + +## 3. Read Path + +How a measurements query flows from the REST API through to PromQL. + +```mermaid +sequenceDiagram + participant UI as OpenNMS UI / REST + participant TSM as TimeseriesStorageManager + participant TSS as CortexTSS.getTimeseries() + participant Cache as Metric Cache (Guava) + participant HTTP as OkHttp + participant BE as Backend + participant RM as ResultMapper + + UI->>TSM: GET /rest/measurements + TSM->>TSS: getTimeseries(request) + + TSS->>Cache: lookup metric metadata + alt Cache miss + Cache-->>TSS: miss + TSS->>BE: findMetrics() → /series + BE-->>TSS: metric with mtype, tags + TSS->>Cache: populate + else Cache hit + Cache-->>TSS: metric + end + + Note over TSS: Build PromQL selector
{resourceId="...", __name__="..."} + + alt mtype = counter + Note over TSS: Wrap with rate({...}[interval]) + end + + alt aggregation requested + Note over TSS: Wrap with avg/max/min(...) + end + + Note over TSS: Step = max(1, ceil(duration / 1200)) + + TSS->>HTTP: GET /query_range?query={PromQL}&start=...&end=...&step=... + HTTP->>BE: PromQL range query + BE-->>HTTP: JSON matrix result + HTTP-->>TSS: response body + + TSS->>RM: fromRangeQueryResult(json, metric) + RM-->>TSS: List + TSS-->>UI: time series data +``` + +--- + +## 4. Metric Discovery + +How `findMetrics()` routes between the standard path and the Thanos-optimized two-phase path. + +```mermaid +flowchart TB + START["findMetrics(tagMatchers)"] --> CHECK{"useLabelValuesForDiscovery
AND wildcard regex
on resourceId?"} + + CHECK -->|no| STANDARD + CHECK -->|yes| TWOPHASE + + subgraph STANDARD["Standard Path"] + direction TB + S1["GET /series?match[]={query}
with regex matchers"] + S2["ResultMapper.fromSeriesQueryResult()"] + S3["Append external tags
from KeyValueStore"] + S1 --> S2 --> S3 + end + + subgraph TWOPHASE["Two-Phase Path (Thanos Optimized)"] + direction TB + P1["Phase 1
GET /label/resourceId/values?match[]={query}
Index-only scan — no chunk decompression"] + P2["Chunk into batches
(size = discoveryBatchSize)"] + P3["Phase 2
GET /series?match[]={resourceId=~'^(id1|id2|...)$'}
Exact-match alternation — index lookup"] + P4["Append external tags
from KeyValueStore"] + P1 --> P2 --> P3 --> P4 + end + + STANDARD --> RESULT["List<Metric>
with intrinsic + meta + external tags"] + TWOPHASE --> RESULT + + classDef decision fill:#fff3e0,stroke:#FF9800,color:#000 + classDef phase1 fill:#e8f5e9,stroke:#4CAF50,color:#000 + classDef phase2 fill:#e8f4fd,stroke:#2196F3,color:#000 + + class CHECK decision + class P1 phase1 + class P3 phase2 +``` + +--- + +## 5. Infrastructure + +Caching, external tags persistence, concurrency control, and observability. + +```mermaid +graph TB + subgraph Caching["Caches (Guava)"] + MC["Metric Cache
max: metricCacheSize
Avoids repeat /series lookups
on read path
"] + ETC["External Tags Cache
max: externalTagsCacheSize
Preloaded from KV store
on startup
"] + end + + subgraph Concurrency["Concurrency Control"] + DISP["OkHttp Dispatcher
maxRequests: maxConns
maxRequestsPerHost: maxConns"] + POOL["Connection Pool
maxIdle: maxConns
keepAlive: 5 min"] + BULKHEAD["Resilience4j Bulkhead
permits: maxConns x 4
fair FIFO ordering"] + + DISP --- POOL + BULKHEAD -->|"gates writes"| DISP + end + + subgraph ExtTags["External Tags Persistence"] + KVS["KeyValueStore
context: CORTEX_TSS"] + WRITE_ET["Write path:
persistExternalTags()
additive-only upsert"] + READ_ET["Read path:
appendExternalTags()
merge into Metric"] + + WRITE_ET --> ETC + READ_ET --> ETC + ETC <-->|"putAsync / get"| KVS + end + + subgraph Metrics["Observability (Dropwizard)"] + RATES["Rates (events/sec)
samplesWritten
samplesLost
extTagsModified
extTagPutTransactionFailed"] + GAUGES["Gauges (current state)
connectionCount
idleConnectionCount
queuedCallsCount
runningCallsCount
availableConcurrentCalls"] + SHELL["Karaf Shell
opennms-cortex:stats
opennms-cortex:query-metrics"] + + SHELL --> RATES + SHELL --> GAUGES + end + + classDef cache fill:#fce4ec,stroke:#E91E63,color:#000 + classDef concurrency fill:#f3e5f5,stroke:#9C27B0,color:#000 + classDef storage fill:#e8f4fd,stroke:#2196F3,color:#000 + classDef metrics fill:#fff3e0,stroke:#FF9800,color:#000 + + class MC,ETC cache + class DISP,POOL,BULKHEAD concurrency + class KVS,WRITE_ET,READ_ET storage + class RATES,GAUGES,SHELL metrics +``` + +--- + +## 6. E2E Test Stack + +The Docker/Podman Compose smoke test environment (profiles are mutually exclusive). + +```mermaid +graph TB + subgraph Tests["smoke-test.sh (45 tests)"] + T["12 sections:
lifecycle, write, read, meta tags,
sanitization, labels, discovery,
two-phase, REST API, measurements,
data consistency, health"] + end + + subgraph PromProfile["--profile prometheus"] + PP["PostgreSQL
:5432"] + PR["Prometheus
:9090
--web.enable-remote-write-receiver"] + PO["OpenNMS Horizon
:8980 / :8101
+ Cortex TSS Plugin"] + PO -->|"remote write"| PR + PO -->|"query API"| PR + PO --> PP + end + + subgraph ThanosProfile["--profile thanos"] + TP["PostgreSQL
:5432"] + TR["Thanos Receive
:19291 write
:10901 gRPC"] + TQ["Thanos Query
:10902 internal
:9090 external"] + TO["OpenNMS Horizon
:8980 / :8101
+ Cortex TSS Plugin"] + TO -->|"remote write :19291"| TR + TO -->|"query API :10902"| TQ + TR -->|"gRPC store"| TQ + TO --> TP + end + + T -->|"REST + Prom API"| PO + T -->|"REST + Prom API"| TO + + classDef test fill:#fff3e0,stroke:#FF9800,color:#000 + classDef prom fill:#e8f5e9,stroke:#4CAF50,color:#000 + classDef thanos fill:#e8f4fd,stroke:#2196F3,color:#000 + + class T test + class PP,PR,PO prom + class TP,TR,TQ,TO thanos +``` + +--- + +
+Full System Diagram (click to expand) + +All components and connections in a single view. + +```mermaid +graph TB + %% ── OpenNMS Core ────────────────────────────────────────── + subgraph OpenNMS["OpenNMS Horizon"] + direction TB + COLL["Collectd / Pollerd / Telemetryd
(data collectors)"] + TSM["TimeseriesStorageManager
(strategy=INTEGRATION)"] + MAPI["Measurements REST API
GET /rest/measurements"] + RAPI["Resources REST API
GET /rest/resources"] + KVS["KeyValueStore
(OSGi service)"] + + COLL -->|"List<Sample>"| TSM + MAPI -->|"TimeSeriesFetchRequest"| TSM + RAPI -->|"findMetrics()"| TSM + end + + %% ── Plugin Internals ────────────────────────────────────── + subgraph Plugin["CortexTSS Plugin (OSGi Bundle)"] + direction TB + + subgraph WritePath["Write Path"] + direction TB + STORE["store(List<Sample>)"] + FILT["Filter NaN values"] + SORT["Sort by timestamp"] + SANW["Sanitize metric &
label names"] + LBLSORT["Sort labels
lexicographically"] + PROTO["Build Protobuf
WriteRequest"] + SNAP["Snappy compress"] + BULK["Resilience4j Bulkhead
(maxConns x 4 permits)"] + HTTP_W["OkHttp async POST
Content-Type: x-protobuf
Content-Encoding: snappy
X-Scope-OrgID: orgId"] + + STORE --> FILT --> SORT --> SANW --> LBLSORT --> PROTO --> SNAP --> BULK --> HTTP_W + end + + subgraph ReadPath["Read Path"] + direction TB + GTS["getTimeseries()"] + MCACHE["Metric Cache
(Guava)"] + PQBUILD["Build PromQL query"] + RATE["Wrap rate() if counter"] + AGG["Wrap avg/max/min
if aggregation"] + STEPCALC["Step = max(1,
ceil(duration/1200))"] + HTTP_R["OkHttp GET
/query_range"] + RMAP["ResultMapper
parse JSON to List<Sample>"] + + GTS --> MCACHE --> PQBUILD --> RATE --> AGG --> STEPCALC --> HTTP_R --> RMAP + end + + subgraph Discovery["Metric Discovery"] + direction TB + FM["findMetrics(TagMatchers)"] + ROUTE{"useLabelValues
AND wildcard?"} + + subgraph Standard["Standard Path"] + SER["GET /series?match[]=..."] + SPAR["ResultMapper
parse series JSON"] + SER --> SPAR + end + + subgraph TwoPhase["Two-Phase Path (Thanos Optimized)"] + direction TB + PH1["Phase 1: GET /label/resourceId/values
(index-only scan)"] + PH2["Phase 2: Batch /series queries
regex: ^(id1|id2|...)$
(exact-match index lookup)"] + PH1 --> PH2 + end + + FM --> ROUTE + ROUTE -->|no| SER + ROUTE -->|yes| PH1 + end + + subgraph ExtTags["External Tags"] + direction TB + ETCACHE["External Tags Cache
(Guava)"] + ETPERS["persistExternalTags()"] + ETAPPEND["appendExternalTags()
(read path)"] + end + + subgraph Observability["Observability"] + METRICS["Dropwizard MetricRegistry
samplesWritten | samplesLost
connectionCount | queuedCalls
availableConcurrentCalls"] + STATS["Karaf: opennms-cortex:stats"] + MQUERY["Karaf: opennms-cortex:query-metrics"] + end + end + + %% ── Backends ────────────────────────────────────────────── + subgraph Backends["Prometheus-Compatible Backend"] + direction TB + subgraph PromStack["Prometheus"] + PWRITE["POST /api/v1/write
(remote write receiver)"] + PREAD["GET /api/v1/query_range
GET /api/v1/series
GET /api/v1/label/*/values"] + end + subgraph ThanosStack["Thanos"] + TRECV["Thanos Receive
POST :19291/api/v1/receive"] + TQUERY["Thanos Query
GET :10902/api/v1/*"] + TRECV ---|"gRPC :10901"| TQUERY + end + end + + %% ── Connections ─────────────────────────────────────────── + TSM -->|"store()"| STORE + TSM -->|"getTimeseries()"| GTS + TSM -->|"findMetrics()"| FM + + KVS <-->|"put/get context=CORTEX_TSS"| ETCACHE + STORE -.->|"persist tags"| ETPERS + ETPERS --> ETCACHE + SPAR -.->|"augment metrics"| ETAPPEND + ETAPPEND --> ETCACHE + PH2 -.->|"augment metrics"| ETAPPEND + + HTTP_W -->|"Protobuf + Snappy"| PWRITE + HTTP_W -->|"Protobuf + Snappy"| TRECV + HTTP_R --> PREAD + HTTP_R --> TQUERY + SER --> PREAD + SER --> TQUERY + PH1 --> PREAD + PH1 --> TQUERY + PH2 --> PREAD + PH2 --> TQUERY + + RMAP -->|"List<Sample>"| MAPI + SPAR -->|"List<Metric>"| RAPI + PH2 -->|"List<Metric>"| RAPI + + HTTP_W -.->|"mark()"| METRICS + STATS --> METRICS + MQUERY --> FM + + %% ── Styling ─────────────────────────────────────────────── + classDef core fill:#e8f4fd,stroke:#2196F3,color:#000 + classDef plugin fill:#fff3e0,stroke:#FF9800,color:#000 + classDef backend fill:#e8f5e9,stroke:#4CAF50,color:#000 + classDef cache fill:#fce4ec,stroke:#E91E63,color:#000 + + class COLL,TSM,MAPI,RAPI,KVS core + class STORE,FILT,SORT,PROTO,SANW,LBLSORT,SNAP,BULK,HTTP_W,GTS,PQBUILD,RATE,AGG,STEPCALC,HTTP_R,RMAP,FM,ROUTE,SER,SPAR,PH1,PH2,ETPERS,ETAPPEND,METRICS,STATS,MQUERY plugin + class PWRITE,PREAD,TRECV,TQUERY backend + class MCACHE,ETCACHE cache +``` + +
diff --git a/PLUGIN-ARCHITECTURE.md b/PLUGIN-ARCHITECTURE.md new file mode 100644 index 0000000..f7dfb7c --- /dev/null +++ b/PLUGIN-ARCHITECTURE.md @@ -0,0 +1,905 @@ +# OpenNMS Cortex TSS Plugin - Architecture & Logic Reference + +> Comprehensive documentation of the `opennms-cortex-tss-plugin`, a Prometheus-compatible time series storage backend for OpenNMS. + +**Version**: 2.0.7-SNAPSHOT +**Package**: `org.opennms.timeseries.cortex` +**License**: AGPL v3 +**Repository**: [OpenNMS/opennms-cortex-tss-plugin](https://github.com/OpenNMS/opennms-cortex-tss-plugin) + +--- + +## System Diagram + +See **[MERMAID.md](MERMAID.md)** for the full architecture diagram covering write path, read path, discovery routing, external tags, observability, and backend connectivity. + +--- + +## Table of Contents + +1. [Overview](#overview) +2. [Module Structure](#module-structure) +3. [Core Components](#core-components) +4. [Write Path](#write-path) +5. [Read Path](#read-path) +6. [Label Values Discovery (Thanos Optimization)](#label-values-discovery) +7. [External Tags Persistence](#external-tags-persistence) +8. [Concurrency & Resilience](#concurrency--resilience) +9. [Configuration Reference](#configuration-reference) +10. [OSGi & Karaf Integration](#osgi--karaf-integration) +11. [Karaf Shell Commands](#karaf-shell-commands) +12. [Sanitization Rules](#sanitization-rules) +13. [Observable Metrics](#observable-metrics) +14. [Error Handling & Failure Modes](#error-handling--failure-modes) +15. [Testing Infrastructure](#testing-infrastructure) +16. [E2E Smoke Test Harness](#e2e-smoke-test-harness) +17. [Performance Tuning](#performance-tuning) +18. [Known Limitations](#known-limitations) +19. [Dependencies](#dependencies) + +--- + +## Overview + +The Cortex TSS Plugin implements the OpenNMS Integration API `TimeSeriesStorage` interface, enabling OpenNMS to store and retrieve time series metrics in any Prometheus-compatible backend (Cortex, vanilla Prometheus, Thanos, Mimir, etc.). + +**Data flow summary:** + +``` +OpenNMS Collectors/Pollers + │ + ▼ +TimeseriesStorageManager (core) + │ + ▼ +CortexTSS.store() CortexTSS.getTimeseries() + │ │ + ▼ ▼ +Protobuf + Snappy encode PromQL query build + │ │ + ▼ ▼ +POST /api/v1/write GET /api/v1/query_range +(Prometheus Remote Write) (Prometheus HTTP API) + │ │ + ▼ ▼ +Cortex / Prometheus / Thanos Cortex / Prometheus / Thanos +``` + +To activate in OpenNMS, set `org.opennms.timeseries.strategy=integration` in `opennms.properties.d/`. + +--- + +## Module Structure + +``` +cortex-parent/ +├── plugin/ # Core plugin (OSGi bundle) +│ ├── src/main/java/.../cortex/ +│ │ ├── CortexTSS.java # TimeSeriesStorage implementation (~760 lines) +│ │ ├── CortexTSSConfig.java # Immutable config with builder pattern +│ │ ├── ResultMapper.java # Prometheus JSON response parsing +│ │ └── shell/ +│ │ ├── MetricQuery.java # Karaf: opennms-cortex:query-metrics +│ │ └── Stats.java # Karaf: opennms-cortex:stats +│ ├── src/main/proto/ +│ │ ├── remote.proto # PrometheusRemote.WriteRequest +│ │ └── types.proto # PrometheusTypes.TimeSeries, Label, Sample +│ ├── src/main/resources/OSGI-INF/blueprint/ +│ │ └── blueprint.xml # OSGi service registration & config injection +│ └── src/test/ +│ ├── java/.../cortex/ +│ │ ├── CortexTSSTest.java # Unit tests (sanitization, query building) +│ │ ├── ResultMapperTest.java # Response parsing tests +│ │ ├── CortexTSSIntegrationTest.java # Testcontainers integration +│ │ └── NMS16271_IT.java # NaN filtering regression test +│ └── resources/ +│ ├── seriesQueryResult.json +│ ├── rangeQueryResult.json +│ └── labelValuesResult.json +│ +├── karaf-features/ # Karaf feature definitions +│ └── src/main/resources/features.xml +│ +├── wrap/ # OSGi wrapper for Resilience4j +│ +├── assembly/ # KAR file packaging +│ +└── e2e/ # End-to-end smoke test harness + ├── smoke-test.sh # 45-test validation script + ├── docker-compose.yml # Prometheus & Thanos profiles + ├── prometheus.yml + ├── opennms-overlay/ # Shared OpenNMS config overlays + ├── opennms-overlay-prometheus/ # Prometheus-specific cortex.cfg + └── opennms-overlay-thanos/ # Thanos-specific cortex.cfg +``` + +--- + +## Core Components + +### CortexTSS.java + +The central class. Implements `TimeSeriesStorage` from the OpenNMS Integration API. + +**Constructor**: `CortexTSS(CortexTSSConfig config, KeyValueStore kvStore)` + +Initializes: +- OkHttp client with configured dispatcher, connection pool, and timeouts +- Resilience4j bulkhead for write concurrency control +- Guava caches for metrics and external tags +- Dropwizard `MetricRegistry` for observability +- Background preload of external tags from `KeyValueStore` + +**Key constants:** +| Constant | Value | Purpose | +|---|---|---| +| `METRIC_NAME_LABEL` | `__name__` | Prometheus metric name label | +| `MAX_SAMPLES` | `1200` | Max data points per range query | +| `X_SCOPE_ORG_ID_HEADER` | `X-Scope-OrgID` | Cortex multi-tenancy header | +| `METRIC_NAME_PATTERN` | `^[a-zA-Z_:][a-zA-Z0-9_:]*$` | Valid Prometheus metric name | +| `LABEL_NAME_PATTERN` | `^[a-zA-Z_][a-zA-Z0-9_]*$` | Valid Prometheus label name | + +**Destroy lifecycle**: Shuts down the executor, evicts connections, cancels in-flight requests. + +### CortexTSSConfig.java + +Immutable configuration object with a builder pattern. All properties are set at construction (via OSGi Blueprint injection) and cannot change without plugin reload. + +### ResultMapper.java + +Parses three types of Prometheus HTTP API responses: + +| Method | Endpoint | Returns | +|---|---|---| +| `fromRangeQueryResult()` | `/query_range` | `List` — timestamped metric values | +| `fromSeriesQueryResult()` | `/series` | `List` — discovered metric metadata | +| `parseLabelValuesResponse()` | `/label/*/values` | `List` — unique label values | + +Uses Jackson streaming parser for memory-efficient `/series` parsing. + +--- + +## Write Path + +### Flow: Sample to Backend + +``` +1. CortexTSS.store(List samples) + │ + ├─ Filter out NaN values (Prometheus can't store them) + ├─ Sort samples by timestamp (Cortex requirement for in-order ingestion) + │ + ├─ For each sample: + │ ├─ toPrometheusTimeSeries(sample) + │ │ ├─ Extract all tags (intrinsic + meta) + │ │ ├─ Map "name" tag → "__name__" label + │ │ ├─ Sanitize metric name (see Sanitization Rules) + │ │ ├─ Sanitize all label names and values + │ │ ├─ Create PrometheusTypes.Label for each tag + │ │ ├─ SORT labels lexicographically by name (Prometheus spec requirement) + │ │ └─ Build PrometheusTypes.TimeSeries with labels + sample(timestamp_ms, value) + │ │ + │ └─ persistExternalTags(sample) + │ └─ Upsert external tags to KeyValueStore (see External Tags section) + │ + ├─ Build PrometheusRemote.WriteRequest containing all TimeSeries + ├─ Serialize to protobuf bytes: writeRequest.toByteArray() + ├─ Compress with Snappy: Snappy.compress(bytes) + │ + └─ Async POST via bulkhead: + POST {writeUrl} + Headers: + Content-Type: application/x-protobuf + Content-Encoding: snappy + X-Prometheus-Remote-Write-Version: 0.1.0 + X-Scope-OrgID: {organizationId} (if configured) + +2. Callback: + ├─ Success (HTTP 2xx): samplesWritten.mark(count) + └─ Failure: samplesLost.mark(count), log error with response body +``` + +### Protobuf Wire Format + +Defined in `remote.proto` and `types.proto` (Prometheus standard): + +```protobuf +message WriteRequest { + repeated TimeSeries timeseries = 1; +} + +message TimeSeries { + repeated Label labels = 1; // sorted lexicographically by name + repeated Sample samples = 2; +} + +message Label { + string name = 1; + string value = 2; +} + +message Sample { + double value = 1; + int64 timestamp = 2; // milliseconds since epoch +} +``` + +The serialized protobuf is compressed with Snappy before transmission (~70-80% compression ratio for typical time series data). + +--- + +## Read Path + +### Time Series Fetching + +``` +CortexTSS.getTimeseries(TimeSeriesFetchRequest request) + │ + ├─ 1. Load original Metric (for metadata like mtype): + │ Check metricCache → if miss, call findMetrics() to populate + │ + ├─ 2. Build PromQL query: + │ Base selector: {resourceId="...", __name__="..."} + │ + │ If mtype is "counter" or "count": + │ Wrap with rate(): rate({...}[interval_s]) + │ interval = step * 2.1 (ensures >= 2 samples for rate calc) + │ + │ If aggregation requested: + │ AVERAGE → avg({...}) + │ MAX → max({...}) + │ MIN → min({...}) + │ + ├─ 3. Calculate step size: + │ If request specifies step: use it + │ Otherwise: step = max(1, ceil(duration_seconds / 1200)) + │ This caps results at MAX_SAMPLES (1200) data points + │ + ├─ 4. Execute query: + │ GET {readUrl}/query_range?query={PromQL}&start={ts}&end={ts}&step={s} + │ + └─ 5. Parse response: + ResultMapper.fromRangeQueryResult(json, metric) + → List with original metric metadata preserved +``` + +### Step Calculation Examples + +| Duration | Calculation | Step | Points | +|---|---|---|---| +| 1 hour (3,600s) | ceil(3600 / 1200) = 3 | 3s | ~1,200 | +| 24 hours (86,400s) | ceil(86400 / 1200) = 72 | 72s | ~1,200 | +| 7 days (604,800s) | ceil(604800 / 1200) = 504 | 504s | ~1,200 | +| 90 days (7,776,000s) | ceil(7776000 / 1200) = 6,480 | 6,480s | ~1,200 | + +### Metric Discovery + +``` +CortexTSS.findMetrics(Collection tagMatchers) + │ + ├─ Route decision: + │ If useLabelValuesForDiscovery=true AND query is a wildcard discovery: + │ → findMetricsViaLabelValues() (two-phase, Thanos-optimized) + │ Else: + │ → Direct /series?match[]={...} (standard path) + │ + └─ Return: List with intrinsic + meta + external tags +``` + +A "wildcard discovery query" is detected when a `TagMatcher` of type `EQUALS_REGEX` targets the `resourceId` key — this indicates OpenNMS is scanning for resources rather than querying a specific one. + +--- + +## Label Values Discovery + +This is a Thanos-specific optimization that dramatically improves resource discovery performance on large datasets. + +### The Problem + +Standard `/series` queries with regex matchers (e.g., `{resourceId=~"snmp/1/.*"}`) force Thanos to: +1. Download chunks from Thanos Receive/Store +2. Decompress each chunk +3. Apply regex against every series +4. Return matching series metadata + +This is extremely expensive at scale (millions of series). + +### The Solution: Two-Phase Discovery + +``` +Phase 1: Index-only label values scan (cheap) +───────────────────────────────────────────── +GET {readUrl}/label/resourceId/values?match[]={query}&start=...&end=... + +Returns: ["snmp/1/eth0/mib2-interfaces", "snmp/1/eth1/mib2-interfaces", "snmp/1/nodeSnmp", ...] + +Cost: Index-only scan in Thanos — no chunk decompression needed. + + +Phase 2: Batched targeted /series queries (focused) +──────────────────────────────────────────────────── +For each batch of resourceIds (size = discoveryBatchSize): + + 1. Escape regex metacharacters in each resourceId: + "mib2-interfaces" → "mib2\\-interfaces" + Characters escaped: \ { } ( ) [ ] . + * ? ^ $ | - + + 2. Build alternation regex: + ^(snmp/1/eth0/mib2\\-interfaces|snmp/1/eth1/mib2\\-interfaces|snmp/1/nodeSnmp)$ + + 3. Query: + GET /series?match[]={resourceId=~"^(batch)$"} + + 4. Parse results with full metadata + +Cost: Exact-match alternation allows Thanos to use index lookups + instead of full regex scanning. No chunk decompression. + + +Result: 100-1000x faster for large resource trees on Thanos. +``` + +### Configuration + +| Property | Default | Purpose | +|---|---|---| +| `useLabelValuesForDiscovery` | `false` | Enable two-phase discovery | +| `discoveryBatchSize` | `50` | Number of resourceIds per `/series` batch | + +Both Prometheus and Thanos support the `/label/*/values` API, but the performance benefit is most dramatic on Thanos where chunk decompression is the bottleneck. + +--- + +## External Tags Persistence + +External tags are additional metadata that doesn't fit within Prometheus labels (which have strict naming/size constraints). They are stored in OpenNMS's `KeyValueStore` service. + +### Storage Model + +``` +KeyValueStore context: "CORTEX_TSS" +Key: metric.getKey() (e.g., composite of intrinsic tags) +Value: JSON string — {"tag1": "val1", "tag2": "val2", ...} +``` + +### Caching Strategy + +``` +externalTagsCache (Guava, max entries = externalTagsCacheSize) + │ + ├─ Startup: Preload all entries via kvStore.enumerateContextAsync("CORTEX_TSS") + │ + ├─ Write path (persistExternalTags): + │ ├─ Cache HIT: Compare tags, upsert only new ones + │ ├─ Cache MISS: Fetch from KV store, merge, update cache + │ └─ If tags changed: kvStore.putAsync() + update cache on completion + │ + └─ Read path (ResultMapper.appendExternalTagsToMetric): + └─ Fetch from KV store by metric key, merge into Metric as externalTag() +``` + +### Upsert Logic + +Tags are additive only — existing tags are never overwritten: +```java +for (Tag tag : sample.getMetric().getExternalTags()) { + if (!existingJson.has(tag.getKey())) { + existingJson.put(tag.getKey(), tag.getValue()); + needUpsert = true; + } +} +``` + +--- + +## Concurrency & Resilience + +### OkHttp Client + +``` +OkHttpClient +├── Dispatcher +│ ├── ExecutorService: FixedThreadPool(maxConcurrentHttpConnections) +│ ├── maxRequests: maxConcurrentHttpConnections +│ └── maxRequestsPerHost: maxConcurrentHttpConnections +│ +├── ConnectionPool +│ ├── maxIdleConnections: maxConcurrentHttpConnections +│ └── keepAliveDuration: 5 minutes +│ +└── Timeouts + ├── readTimeout: readTimeoutInMs + └── writeTimeout: writeTimeoutInMs +``` + +### Resilience4j Bulkhead + +The bulkhead gates all write operations, preventing the OkHttp dispatcher from being overwhelmed: + +``` +Bulkhead "asyncHttpCalls" +├── maxConcurrentCalls: maxConcurrentHttpConnections × 4 +│ (4x multiplier allows batching headroom) +├── maxWaitDuration: bulkheadMaxWaitDuration +│ (default: Long.MAX_VALUE ≈ 292 million years — effectively unlimited) +├── fairCallHandling: true (FIFO ordering) +│ +└── Usage: + bulkhead.executeCompletionStage(() -> executeAsync(request)) + .whenComplete((result, ex) -> { + if (ex == null) samplesWritten.mark(); + else samplesLost.mark(); + }) +``` + +### Shutdown Sequence + +```java +void destroy() { + executor.shutdown(); + connectionPool.evictAll(); + dispatcher.cancelAll(); +} +``` + +--- + +## Configuration Reference + +All properties are managed via OSGi Config Admin, PID: `org.opennms.plugins.tss.cortex` + +| Property | Type | Default | Description | +|---|---|---|---| +| `writeUrl` | String | `http://localhost:9009/api/prom/push` | Prometheus remote write endpoint | +| `readUrl` | String | `http://localhost:9009/prometheus/api/v1` | Prometheus query API base URL | +| `maxConcurrentHttpConnections` | int | `100` | Thread pool, connection pool, and dispatcher size | +| `writeTimeoutInMs` | long | `5000` | HTTP write request timeout (ms) | +| `readTimeoutInMs` | long | `5000` | HTTP read request timeout (ms) | +| `metricCacheSize` | long | `1000` | Guava cache max entries for metric metadata | +| `externalTagsCacheSize` | long | `1000` | Guava cache max entries for external tags | +| `bulkheadMaxWaitDuration` | long | `Long.MAX_VALUE` | Max time a write can wait for a bulkhead permit (ms) | +| `maxSeriesLookback` | long | `7776000` (90 days) | Time range for `/series` discovery queries (seconds) | +| `organizationId` | String | `""` (disabled) | Cortex multi-tenancy `X-Scope-OrgID` header value | +| `useLabelValuesForDiscovery` | boolean | `false` | Enable two-phase label values discovery | +| `discoveryBatchSize` | int | `50` | Batch size for `/series` queries during two-phase discovery | + +### Backend-Specific Configurations + +**Vanilla Prometheus:** +```properties +writeUrl = http://prometheus:9090/api/v1/write +readUrl = http://prometheus:9090/api/v1 +``` +Requires `--web.enable-remote-write-receiver` flag on Prometheus. + +**Thanos (Receive + Query):** +```properties +writeUrl = http://thanos-receive:19291/api/v1/receive +readUrl = http://thanos-query:10902/api/v1 +useLabelValuesForDiscovery = true +discoveryBatchSize = 50 +``` + +**Cortex:** +```properties +writeUrl = http://cortex-distributor:9009/api/prom/push +readUrl = http://cortex-query-frontend:9009/prometheus/api/v1 +``` + +--- + +## OSGi & Karaf Integration + +### Blueprint Wiring (`blueprint.xml`) + +```xml + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +``` + +**Lifecycle**: `update-strategy="reload"` triggers full bean recreation on config changes via `config:edit` / `config:update` in the Karaf shell. + +### Karaf Feature + +Feature name: `opennms-plugins-cortex-tss` + +Key bundle dependencies: +- `protobuf-java` (Prometheus wire format) +- `snappy-java` (compression) +- `okhttp` + `okio` (HTTP client, via Servicemix wrappers) +- `guava` + `failureaccess` (caching) +- `jackson-core/databind/annotations` (JSON streaming) +- `metrics-core` (Dropwizard observability) +- `json` (org.json for external tags) +- `resilience4j` (bulkhead, custom OSGi wrap) + +### Installation + +```bash +# KAR file deployment (simplest) +cp opennms-cortex-tss-plugin.kar ${OPENNMS_HOME}/deploy/ + +# OR feature repository + install +karaf> feature:repo-add mvn:org.opennms.plugins.timeseries/cortex-karaf-features/VERSION/xml +karaf> feature:install opennms-plugins-cortex-tss + +# Verify +karaf> bundle:list | grep cortex +karaf> feature:list | grep cortex +``` + +--- + +## Karaf Shell Commands + +### `opennms-cortex:query-metrics` + +Discovers metrics matching tag criteria. + +```bash +# Find all metrics for a resource (exact match) +karaf> opennms-cortex:query-metrics resourceId "snmp/1/nodeSnmp" + +# Find metrics by name +karaf> opennms-cortex:query-metrics name "icmp" + +# Multiple tag pairs +karaf> opennms-cortex:query-metrics resourceId "snmp/1/.*" name "ifInOctets" +``` + +Arguments are key-value pairs. Default matcher type is `EQUALS`. + +### `opennms-cortex:stats` + +Prints a Dropwizard metrics report to the console. + +```bash +karaf> opennms-cortex:stats + +# Output includes: +# - samplesWritten rate (samples/sec) +# - samplesLost rate +# - connectionCount, idleConnectionCount +# - queuedCallsCount, runningCallsCount +# - availableConcurrentCalls (bulkhead) +``` + +Uses `ConsoleReporter` for formatted output with rates in /sec and durations in ms. + +--- + +## Sanitization Rules + +Prometheus has strict requirements for metric and label names. The plugin sanitizes all names before storage. + +### Metric Names (`__name__`) + +``` +Valid pattern: ^[a-zA-Z_:][a-zA-Z0-9_:]*$ + +Rules: + - First character must be letter, underscore, or colon + - Subsequent characters: letter, digit, underscore, or colon + - All invalid characters replaced with underscore + +Examples: + "ifHCInOctets" → "ifHCInOctets" (valid, unchanged) + "jmx-minion" → "jmx_minion" (hyphen → underscore) + "response:127.0.0.1" → "response:127_0_0_1" (dots → underscores, colon kept) + "1invalidStart" → "_1invalidStart" (digit start → prepend underscore) +``` + +### Label Names + +``` +Valid pattern: ^[a-zA-Z_][a-zA-Z0-9_]*$ + +Rules: + - Same as metric names but colons are NOT allowed + - All invalid characters (including colons) replaced with underscore + +Examples: + "resourceId" → "resourceId" (valid, unchanged) + "SSH/127.0.0.1" → "SSH_127_0_0_1" (slash and dots → underscores) +``` + +### Label Values + +``` +Rules: + - Truncated to 2048 characters (Prometheus hard limit) + - No character replacement (values can contain any UTF-8) +``` + +--- + +## Observable Metrics + +Exposed via Dropwizard `MetricRegistry`, accessible through `opennms-cortex:stats`. + +### Rates (Meter — events/sec) + +| Metric | Description | +|---|---| +| `samplesWritten` | Successful sample writes to backend | +| `samplesLost` | Failed writes (data loss) | +| `extTagsModified` | External tags upserted to KV store | +| `extTagsCacheUsed` | External tags cache hits | +| `extTagsCacheMissed` | External tags cache misses | +| `extTagPutTransactionFailed` | KV store write failures | + +### Gauges (Current State) + +| Metric | Description | +|---|---| +| `connectionCount` | Active HTTP connections | +| `idleConnectionCount` | Idle (keepalive) connections | +| `queuedCallsCount` | Requests waiting in OkHttp dispatcher | +| `runningCallsCount` | In-flight HTTP requests | +| `availableConcurrentCalls` | Available bulkhead permits | +| `maxAllowedConcurrentCalls` | Total bulkhead capacity | + +--- + +## Error Handling & Failure Modes + +### Write Failures + +| Cause | Behavior | Recovery | +|---|---|---| +| Backend unreachable | `samplesLost++`, error logged | Next batch retries automatically | +| Request timeout | `samplesLost++`, error logged | Increase `writeTimeoutInMs` | +| HTTP 4xx/5xx | `samplesLost++`, response body logged | Check backend logs | +| Bulkhead full | `CompletionStage` fails | Increase `maxConcurrentHttpConnections` | +| NaN value | Silently filtered before write | Expected behavior, no action needed | + +Write failures do **not** block the sample pipeline — failed samples are lost and tracked via metrics. + +### Read Failures + +| Cause | Behavior | Recovery | +|---|---|---| +| Backend unreachable | `StorageException` thrown to caller | Check network/backend health | +| HTTP non-2xx | Exception includes response body | Check PromQL syntax, backend logs | +| Empty response body | `StorageException`: "no body" | Backend may be misconfigured | +| JSON parse error | Exception from ResultMapper | Check response format version | + +Read failures propagate as exceptions to the OpenNMS REST/Measurements API layer. + +### External Tags Failures + +KV store write failures are logged and tracked (`extTagPutTransactionFailed`) but do not block the write path. Tags will be retried on the next write for that metric. + +--- + +## Testing Infrastructure + +### Unit Tests + +| Class | Tests | Coverage | +|---|---|---| +| `CortexTSSTest` | 9 | Sanitization, step calculation, query building, wildcard detection, batch regex | +| `ResultMapperTest` | 5 | Series/range/label-values parsing, external tags, empty results | + +### Integration Tests (Testcontainers) + +| Class | Tests | Backend | Notes | +|---|---|---|---| +| `CortexTSSIntegrationTest` | varies | Cortex (Docker) | Extends `AbstractStorageIntegrationTest` from integration-api | +| `NMS16271_IT` | 1 | Cortex (Docker) | NaN filtering regression (NMS-16271) | + +The integration tests use a `docker-compose.yaml` bundled in test resources that spins up a single-process Cortex instance. + +**Ignored tests** (known Cortex limitations): +- `shouldFindWithNotEquals` — Cortex `!=` operator issue +- `shouldFindOneMetricWithRegexNotMatching` — Cortex `!~` operator issue +- `shouldDeleteMetrics` — delete not implemented (requires admin API) + +### Test JSON Fixtures + +| File | Purpose | +|---|---| +| `seriesQueryResult.json` | Mock `/series` response for `fromSeriesQueryResult()` | +| `rangeQueryResult.json` | Mock `/query_range` response for `fromRangeQueryResult()` | +| `labelValuesResult.json` | Mock `/label/*/values` response for `parseLabelValuesResponse()` | + +--- + +## E2E Smoke Test Harness + +Located in `e2e/`. A full-stack validation environment using Docker/Podman Compose. + +### Architecture + +``` +┌─────────────────────────────────────────────────────────────┐ +│ smoke-test.sh (45 tests) │ +│ │ +│ ┌─────────┐ ┌──────────┐ ┌────────────────────────┐ │ +│ │ Postgres │◄───│ OpenNMS │───►│ Prometheus OR Thanos │ │ +│ │ :5432 │ │ :8980 │ │ (profile-dependent) │ │ +│ └─────────┘ │ :8101 │ └────────────────────────┘ │ +│ └──────────┘ │ +└─────────────────────────────────────────────────────────────┘ +``` + +### Profiles + +**Prometheus profile** (`--profile prometheus`): +- Prometheus with `--web.enable-remote-write-receiver` +- Write: `http://prometheus:9090/api/v1/write` +- Read: `http://prometheus:9090/api/v1` + +**Thanos profile** (`--profile thanos`): +- Thanos Receive (write) + Thanos Query (read) +- Write: `http://thanos-receive:19291/api/v1/receive` +- Read: `http://thanos-query:10902/api/v1` + +### Test Sections (45 tests, 12 sections) + +1. **Plugin Lifecycle** (3) — Feature installed, config loaded, Karaf started +2. **Write Path** (4) — Metrics flowing, resourceIds populated, multiple types, endpoint health +3. **Read Path** (4) — Query API, range queries, instant queries, exact match +4. **Meta Tags** (4) — Node/location labels, values populated, consistency +5. **Metric Sanitization** (3) — Valid names, valid labels, no illegal chars +6. **Label Ordering** (1) — Lexicographic sort verified +7. **Resource Discovery** (4) — Wildcard /series, /label/*/values, filtered, completeness +8. **Two-Phase Discovery** (4) — Label values match, batched /series, dedup, empty filter +9. **OpenNMS REST API** (4) — Resource tree, children, graph attrs, multiple types +10. **Measurements API** (6) — Data retrieval, AVERAGE/MAX/MIN aggregation, response time, metadata +11. **Data Consistency** (3) — resourceId uniqueness, valid mtype values, series integrity +12. **Plugin Health** (5) — Write errors, no exceptions, no pool exhaustion, feature registered, data freshness + +### Usage + +```bash +# Prerequisites: KAR in e2e/opennms-overlay/deploy/ +cp assembly/kar/target/opennms-cortex-tss-plugin.kar e2e/opennms-overlay/deploy/ + +# Start stack +cd e2e +docker compose --profile prometheus up -d +# OR +docker compose --profile thanos up -d + +# Run tests (auto-detects backend, or specify) +./smoke-test.sh +./smoke-test.sh --backend prometheus +./smoke-test.sh --backend thanos +``` + +### OpenNMS Config Overlays + +**Shared** (`opennms-overlay/etc/`): +- `cortex.properties` — Enables `org.opennms.timeseries.strategy=integration` + meta tag mappings +- `featuresBoot.d/cortex.boot` — Auto-installs the cortex feature on startup +- `collectd-configuration.xml` — 30-second collection intervals (fast data for testing) +- `poller-configuration.xml` — 30-second polling with response time graphs + +**Per-backend** cortex.cfg files provide the correct `writeUrl` and `readUrl` for each profile. + +--- + +## Performance Tuning + +### Recommended Settings by Deployment + +| Scenario | `maxConcurrentHttpConnections` | Timeouts (ms) | `metricCacheSize` | Notes | +|---|---|---|---|---| +| Small (< 100 nodes) | 50 | 5,000 | 1,000 | Defaults are fine | +| Medium (100-1,000 nodes) | 100-200 | 10,000 | 5,000 | Increase cache | +| Large (1,000+ nodes) | 200+ | 30,000 | 10,000 | Thanos recommended | +| Thanos backend | Any | 30,000-60,000 | 5,000+ | Enable label values discovery | + +### Thanos-Specific Optimization + +Without `useLabelValuesForDiscovery`: +- Every resource discovery query triggers full `/series` scan with regex +- Thanos must download and decompress chunks from Receive/Store +- O(total_series) per discovery query + +With `useLabelValuesForDiscovery=true`: +- Phase 1: Index-only scan — O(index_entries) +- Phase 2: Exact-match batched lookups — O(matching_series) +- **100-1000x improvement** for large resource trees + +### Write Throughput + +``` +Typical performance characteristics: +├── Samples per batch: variable (OpenNMS batches internally) +├── Snappy compression ratio: ~70-80% +├── Network per batch: 100-300 KB compressed +├── Sustainable throughput: 5,000-20,000 samples/sec +│ +└── Bottlenecks (in order of likelihood): + 1. Backend ingestion rate + 2. Network latency (RTT to backend) + 3. Bulkhead queue depth + 4. CPU (Snappy is very fast, rarely a factor) +``` + +--- + +## Known Limitations + +1. **Delete operations**: Not implemented. Would require enabling Prometheus admin API (`--web.enable-admin-api`) and calling `POST /api/v1/admin/tsdb/delete_series`. + +2. **Negative label matchers**: `!=` and `!~` operators have known issues with Cortex. Tests for these are `@Ignore`d. + +3. **Counter queries require rate()**: Prometheus cannot return raw counter values meaningfully. The plugin automatically wraps counter metrics with `rate()`. + +4. **Label value length**: Values are silently truncated to 2,048 characters (Prometheus limit). + +5. **In-memory metric cache**: Not shared across plugin instances (only relevant if running multiple OpenNMS instances against the same backend). + +6. **NaN values**: Silently discarded during writes. This creates gaps in time series where the collector produced NaN. + +7. **Long-duration query precision**: Queries spanning more than 1,200 × step seconds will be capped at 1,200 data points, with step size automatically increased. + +--- + +## Dependencies + +### Runtime + +| Library | Version | Purpose | +|---|---|---| +| opennms-integration-api | 2.0.0 | TimeSeriesStorage interface, KeyValueStore | +| OkHttp | 4.12.0 | HTTP client (write + read) | +| Protobuf | 3.25.3 | Prometheus remote write serialization | +| Snappy | 1.1.10.5 | Protobuf compression | +| Guava | 33.2.1-jre | In-memory caching (metrics, external tags) | +| Jackson | 2.16.1 | Streaming JSON parsing (series results) | +| Resilience4j | 1.7.1 | Bulkhead for write concurrency control | +| Dropwizard Metrics | 4.2.25 | Observability (rates, gauges) | +| org.json | 20240303 | External tags JSON serialization | + +### Build & Test + +| Library | Version | Purpose | +|---|---|---| +| JUnit | 4 | Test framework | +| Mockito | - | Mocking (KV store) | +| Hamcrest | - | Assertion matchers | +| Testcontainers | - | Docker-based integration tests | +| Awaitility | - | Async test assertions | + +### OSGi Infrastructure + +| Component | Purpose | +|---|---| +| Apache Aries Blueprint | Dependency injection + config management | +| Apache Karaf Shell | Shell command framework | +| Servicemix Bundles | OSGi wrappers for OkHttp, OkIO | +| Custom wrap module | OSGi wrapper for Resilience4j | diff --git a/README.md b/README.md index 7f2bfb0..5e30f7a 100644 --- a/README.md +++ b/README.md @@ -1,8 +1,46 @@ # OpenNMS Cortex Plugin [![CircleCI](https://circleci.com/gh/OpenNMS/opennms-cortex-tss-plugin.svg?style=svg)](https://circleci.com/gh/OpenNMS/opennms-cortex-tss-plugin) -This plugin exposes an implementation of the [TimeSeriesStorage](https://github.com/OpenNMS/opennms-integration-api/blob/v0.4.1/api/src/main/java/org/opennms/integration/api/v1/timeseries/TimeSeriesStorage.java#L40) interface that converts metrics to a Prometheus model and delegates writes & reads to [Cortex](https://cortexmetrics.io/). +This plugin exposes an implementation of the [TimeSeriesStorage](https://github.com/OpenNMS/opennms-integration-api/blob/v0.4.1/api/src/main/java/org/opennms/integration/api/v1/timeseries/TimeSeriesStorage.java#L40) interface that converts metrics to a Prometheus model and delegates writes & reads to any Prometheus-compatible backend ([Cortex](https://cortexmetrics.io/), [Thanos](https://thanos.io/), [Mimir](https://grafana.com/oss/mimir/), or vanilla [Prometheus](https://prometheus.io/)). + +```mermaid +graph TB + NET["Monitored Network"] + + subgraph ONMS["OpenNMS Horizon"] + COLLECT["Collectors & Pollers"] + TSS["TSS Integration Layer"] + end + + subgraph Strategies["Time Series Strategy"] + direction LR + RRD["RRD"] + NEWTS["Newts"] + CORTEX["Cortex TSS Plugin"] + end + + subgraph Backend["Prometheus-Compatible Backend"] + direction LR + STORE[("Cortex · Thanos · Mimir · Prometheus")] + end + + NET -->|"SNMP, ICMP, JMX, ..."| COLLECT + COLLECT --> TSS + TSS --> RRD + TSS --> NEWTS + TSS -->|"strategy = integration"| CORTEX + CORTEX -->|"remote write"| STORE + STORE -->|"PromQL"| CORTEX + + style NET fill:#f0f4ff,stroke:#6b7280,color:#000 + style COLLECT fill:#dbeafe,stroke:#2563eb,color:#000 + style TSS fill:#dbeafe,stroke:#2563eb,color:#000 + style RRD fill:#e5e7eb,stroke:#9ca3af,color:#000 + style NEWTS fill:#e5e7eb,stroke:#9ca3af,color:#000 + style CORTEX fill:#bfdbfe,stroke:#1d4ed8,color:#000 + style STORE fill:#93c5fd,stroke:#1e40af,color:#000 +``` -![arch](assets/cortex-plugin-arch.png "Cortex Plugin Architecture") +For detailed architecture documentation including write/read path internals, concurrency model, and configuration reference, see [PLUGIN-ARCHITECTURE.md](PLUGIN-ARCHITECTURE.md) and [MERMAID.md](MERMAID.md). ## Usage @@ -42,15 +80,40 @@ config:edit org.opennms.plugins.tss.cortex property-set writeUrl http://localhost:9009/api/prom/push property-set readUrl http://localhost:9009/prometheus/api/v1 property-set maxConcurrentHttpConnections 100 -property-set writeTimeoutInMs 1000 -property-set readTimeoutInMs 1000 +property-set writeTimeoutInMs 5000 +property-set readTimeoutInMs 5000 property-set metricCacheSize 1000 property-set externalTagsCacheSize 1000 -property-set bulkheadMaxWaitDurationInMs 9223372036854775807 +property-set bulkheadMaxWaitDuration 9223372036854775807 config:update ``` +### Thanos / Large-Scale Deployments + +When using Thanos as a backend, browsing node resources in the OpenNMS web UI can trigger expensive `/series` queries with broad wildcard regex patterns. This causes Thanos to decompress chunks across all matching series, which can overwhelm the query layer at scale. + +To address this, the plugin supports an optional two-phase discovery mode that uses the Prometheus label values API for cheap, index-only resource discovery: + +``` +config:edit org.opennms.plugins.tss.cortex + +# Enable label values API for wildcard resource discovery (default: false) +property-set useLabelValuesForDiscovery true + +# Number of resourceIds per batched /series query (default: 50) +property-set discoveryBatchSize 50 + +config:update +``` + +When enabled, wildcard discovery queries (e.g. `resourceId=~"^prefix/.*$"`) are handled in two phases: + +1. Query `/api/v1/label/resourceId/values` to get unique resource IDs (index-only, no chunk decompression) +2. Batch targeted `/series` queries using exact-match alternation patterns for full metric details + +This produces identical results to the default code path but avoids the expensive wildcard regex scan on Thanos. + Update automatically: ``` bundle:watch * diff --git a/e2e/README.md b/e2e/README.md new file mode 100644 index 0000000..1637b98 --- /dev/null +++ b/e2e/README.md @@ -0,0 +1,131 @@ +# End-to-End Test Harness + +Docker/Podman Compose stack for testing the Cortex TSS plugin against OpenNMS Horizon with Prometheus-compatible backends. + +## Architecture + +```mermaid +graph TB + subgraph "Prometheus Profile" + P_PG[PostgreSQL :5432] + P_PROM[Prometheus :9090
remote write receiver] + P_ONMS[OpenNMS Horizon :8980
Cortex TSS Plugin] + + P_ONMS -->|Prom remote write| P_PROM + P_ONMS -->|Prom query API| P_PROM + P_ONMS --> P_PG + end + + subgraph "Thanos Profile" + T_PG[PostgreSQL :5432] + T_RECV[Thanos Receive
:19291 write
:10901 gRPC
:10902 HTTP] + T_QUERY[Thanos Query
:10902 internal
:9090 external] + T_ONMS[OpenNMS Horizon :8980
Cortex TSS Plugin] + + T_ONMS -->|Prom remote write :19291| T_RECV + T_ONMS -->|Prom query API :10902| T_QUERY + T_QUERY -->|gRPC store :10901| T_RECV + T_ONMS --> T_PG + end + + TESTS[smoke-test.sh
45 tests] -->|REST + Prom API| P_ONMS + TESTS -->|REST + Prom API| T_ONMS + TESTS -->|Direct query :9090| P_PROM + TESTS -->|Direct query :9090| T_QUERY +``` + +Only run one profile at a time (they share host ports 8980, 9090, 5432). + +## Backends + +- **Prometheus** (`--profile prometheus`) -- vanilla Prometheus with `--web.enable-remote-write-receiver`. Gold standard: tests against the reference Prometheus API implementation. +- **Thanos** (`--profile thanos`) -- Thanos Receive + Query. Scale validation: tests against a production-realistic distributed backend with proper Thanos default ports (10902 HTTP, 10901 gRPC). + +## Quick Start + +1. Build the plugin and copy the KAR: + +```bash +cd /path/to/opennms-cortex-tss-plugin +mvn clean install -DskipTests +cp assembly/kar/target/opennms-cortex-tss-plugin.kar e2e/opennms-overlay/deploy/ +``` + +2. Start the stack with your chosen backend: + +```bash +cd e2e +# Gold standard: +docker-compose --profile prometheus up -d + +# Or scale validation: +docker-compose --profile thanos up -d +``` + +3. Wait for OpenNMS to initialize (~2 minutes with fast collection), then run the smoke tests: + +```bash +./smoke-test.sh # auto-detects backend +./smoke-test.sh --backend prometheus # explicit +./smoke-test.sh --backend thanos # explicit +``` + +## Smoke Tests + +`smoke-test.sh` runs 45 tests across 12 sections, backend-agnostic: + +| Section | Tests | What it validates | +|---------|-------|-------------------| +| 1. Plugin Lifecycle | 3 | Feature install, Karaf feature started, cortex config loaded | +| 2. Write Path | 4 | Metrics flowing, resourceIds, multiple resource types, write endpoint health | +| 3. Read Path | 4 | Query API, range queries, instant queries, exact resourceId match | +| 4. Meta Tags | 4 | Node/location/mtype labels written, values populated, tags on series, consistency | +| 5. Metric Sanitization | 3 | Prometheus-valid metric names, label names, no illegal characters | +| 6. Label Ordering | 1 | Labels lexicographically ordered per Prometheus convention | +| 7. Resource Discovery | 4 | /series wildcard, /label/resourceId/values, filtered match, field completeness | +| 8. Two-Phase Discovery | 4 | Label values matches wildcard, batched series, deduplication, empty filter | +| 9. OpenNMS REST API | 4 | Resource tree populated, child resources, graph attributes, multiple resource types | +| 10. Measurements API | 6 | Data retrieval, AVERAGE/MAX/MIN aggregation, response time, metadata | +| 11. Data Consistency | 3 | resourceId uniqueness, valid mtype values, series integrity | +| 12. Plugin Health | 5 | Write errors low, no exceptions, no pool exhaustion, feature registered, data fresh | + +## Configuration + +### Shared overlay (`opennms-overlay/`) + +| File | Purpose | +|------|---------| +| `etc/opennms.properties.d/cortex.properties` | Enables `org.opennms.timeseries.strategy=integration` with meta tags | +| `etc/featuresBoot.d/cortex.boot` | Auto-installs the plugin feature from the deployed KAR | +| `etc/collectd-configuration.xml` | **30-second collection intervals** (vs default 5 min) for fast data generation | +| `etc/poller-configuration.xml` | **30-second polling intervals** with response time data for ICMP, HTTP, SSH | +| `deploy/opennms-cortex-tss-plugin.kar` | The plugin KAR file | + +### Per-backend cortex plugin config + +| File | Write URL | Read URL | +|------|-----------|----------| +| `opennms-overlay-prometheus/etc/org.opennms.plugins.tss.cortex.cfg` | `prometheus:9090/api/v1/write` | `prometheus:9090/api/v1` | +| `opennms-overlay-thanos/etc/org.opennms.plugins.tss.cortex.cfg` | `thanos-receive:19291/api/v1/receive` | `thanos-query:10902/api/v1` | + +## Cleanup + +```bash +# Stop containers and remove volumes (clean slate for next run) +docker-compose --profile prometheus down -v +# or +docker-compose --profile thanos down -v +``` + +## Port Reference + +| Service | Internal Port | Host Port | Notes | +|---------|--------------|-----------|-------| +| OpenNMS Web/REST | 8980 | 8980 | Both profiles | +| OpenNMS Karaf SSH | 8101 | 8101 | Both profiles | +| PostgreSQL | 5432 | 5432 | Shared service | +| Prometheus HTTP | 9090 | 9090 | Prometheus profile only | +| Thanos Receive (write) | 19291 | 19291 | Thanos profile only | +| Thanos Receive (gRPC) | 10901 | 10901 | Thanos profile only | +| Thanos Receive (HTTP) | 10902 | 10902 | Thanos profile only | +| Thanos Query (HTTP) | 10902 | 9090 | Mapped to 9090 for smoke-test.sh compatibility | diff --git a/e2e/docker-compose.yml b/e2e/docker-compose.yml new file mode 100644 index 0000000..e1a877f --- /dev/null +++ b/e2e/docker-compose.yml @@ -0,0 +1,122 @@ +# Usage: +# Prometheus backend (gold standard): docker-compose --profile prometheus up -d +# Thanos backend (scale validation): docker-compose --profile thanos up -d +# +# Only run one profile at a time (they share ports). + +services: + # ─── Vanilla Prometheus backend ─────────────────────────────────── + prometheus: + image: docker.io/prom/prometheus:v2.53.4 + profiles: ["prometheus"] + command: + - --config.file=/etc/prometheus/prometheus.yml + - --storage.tsdb.path=/prometheus + - --storage.tsdb.retention.time=30d + - --web.enable-remote-write-receiver + - --web.listen-address=0.0.0.0:9090 + volumes: + - prometheus-data:/prometheus + - ./prometheus.yml:/etc/prometheus/prometheus.yml:ro + ports: + - "9090:9090" # Query API + remote write receiver + + opennms-prometheus: + image: docker.io/opennms/horizon:35.0.4 + profiles: ["prometheus"] + command: ["-s"] + privileged: true + environment: + POSTGRES_HOST: postgres + POSTGRES_PORT: 5432 + POSTGRES_USER: opennms + POSTGRES_PASSWORD: opennms + OPENNMS_DBNAME: opennms + JAVA_OPTS: "-Xms512m -Xmx1024m" + volumes: + - opennms-prometheus-data:/opennms-data + - ./opennms-overlay/etc:/opt/opennms-etc-overlay + - ./opennms-overlay-prometheus/etc/org.opennms.plugins.tss.cortex.cfg:/opt/opennms-etc-overlay/org.opennms.plugins.tss.cortex.cfg:ro + - ./opennms-overlay/deploy:/opt/opennms/deploy + ports: + - "8980:8980" + - "8101:8101" + depends_on: + - postgres + - prometheus + + # ─── Thanos backend ────────────────────────────────────────────── + thanos-receive: + image: docker.io/thanosio/thanos:v0.35.1 + profiles: ["thanos"] + user: "0:0" # Docker named volumes are root-owned; Thanos runs as uid 1001 by default + command: + - receive + - --tsdb.path=/data + - --grpc-address=0.0.0.0:10901 + - --http-address=0.0.0.0:10902 + - --remote-write.address=0.0.0.0:19291 + - --tsdb.retention=30d + - --label=receive_replica="0" + volumes: + - thanos-data:/data + ports: + - "19291:19291" + - "10902:10902" + - "10901:10901" + thanos-query: + image: docker.io/thanosio/thanos:v0.35.1 + profiles: ["thanos"] + command: + - query + - --http-address=0.0.0.0:10902 + - --grpc-address=0.0.0.0:10903 + - --store=thanos-receive:10901 + ports: + - "9090:10902" # Map to 9090 externally so smoke-test.sh uses same port for both backends + depends_on: + - thanos-receive + + opennms-thanos: + image: docker.io/opennms/horizon:35.0.4 + profiles: ["thanos"] + command: ["-s"] + privileged: true + environment: + POSTGRES_HOST: postgres + POSTGRES_PORT: 5432 + POSTGRES_USER: opennms + POSTGRES_PASSWORD: opennms + OPENNMS_DBNAME: opennms + JAVA_OPTS: "-Xms512m -Xmx1024m -Dnetworkaddress.cache.negative.ttl=0" + volumes: + - opennms-thanos-data:/opennms-data + - ./opennms-overlay/etc:/opt/opennms-etc-overlay + - ./opennms-overlay-thanos/etc/org.opennms.plugins.tss.cortex.cfg:/opt/opennms-etc-overlay/org.opennms.plugins.tss.cortex.cfg:ro + - ./opennms-overlay/deploy:/opt/opennms/deploy + ports: + - "8980:8980" + - "8101:8101" + depends_on: + - postgres + - thanos-receive + - thanos-query + + # ─── Shared services ───────────────────────────────────────────── + postgres: + image: docker.io/postgres:15 + environment: + POSTGRES_USER: opennms + POSTGRES_PASSWORD: opennms + POSTGRES_DB: opennms + volumes: + - pg-data:/var/lib/postgresql/data + ports: + - "5432:5432" + +volumes: + prometheus-data: + thanos-data: + pg-data: + opennms-prometheus-data: + opennms-thanos-data: diff --git a/e2e/opennms-overlay-prometheus/etc/org.opennms.plugins.tss.cortex.cfg b/e2e/opennms-overlay-prometheus/etc/org.opennms.plugins.tss.cortex.cfg new file mode 100644 index 0000000..826f893 --- /dev/null +++ b/e2e/opennms-overlay-prometheus/etc/org.opennms.plugins.tss.cortex.cfg @@ -0,0 +1,15 @@ +# Cortex TSS Plugin Configuration — Vanilla Prometheus backend +# Prometheus accepts remote write natively (--web.enable-remote-write-receiver) +readUrl=http://prometheus:9090/api/v1 +writeUrl=http://prometheus:9090/api/v1/write + +maxConcurrentHttpConnections=100 +writeTimeoutInMs=5000 +readTimeoutInMs=30000 +metricCacheSize=1000 +externalTagsCacheSize=1000 +bulkheadMaxWaitDuration=9223372036854775807 +maxSeriesLookback=31536000 +organizationId= +useLabelValuesForDiscovery=true +discoveryBatchSize=50 diff --git a/e2e/opennms-overlay-thanos/etc/org.opennms.plugins.tss.cortex.cfg b/e2e/opennms-overlay-thanos/etc/org.opennms.plugins.tss.cortex.cfg new file mode 100644 index 0000000..0f5757c --- /dev/null +++ b/e2e/opennms-overlay-thanos/etc/org.opennms.plugins.tss.cortex.cfg @@ -0,0 +1,16 @@ +# Cortex TSS Plugin Configuration — Thanos backend +# Thanos Query provides the Prometheus-compatible read API +readUrl=http://thanos-query:10902/api/v1 +# Thanos Receive accepts Prometheus remote write +writeUrl=http://thanos-receive:19291/api/v1/receive + +maxConcurrentHttpConnections=100 +writeTimeoutInMs=5000 +readTimeoutInMs=30000 +metricCacheSize=1000 +externalTagsCacheSize=1000 +bulkheadMaxWaitDuration=9223372036854775807 +maxSeriesLookback=31536000 +organizationId= +useLabelValuesForDiscovery=true +discoveryBatchSize=50 diff --git a/e2e/opennms-overlay/deploy/.gitkeep b/e2e/opennms-overlay/deploy/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/e2e/opennms-overlay/etc/collectd-configuration.xml b/e2e/opennms-overlay/etc/collectd-configuration.xml new file mode 100644 index 0000000..31026d7 --- /dev/null +++ b/e2e/opennms-overlay/etc/collectd-configuration.xml @@ -0,0 +1,43 @@ + + + + IPADDR != '0.0.0.0' + + + + + + + + + + + + + + + + + + + + + + + + + + + IPADDR != '0.0.0.0' + + + + + + + + + + + + diff --git a/e2e/opennms-overlay/etc/featuresBoot.d/cortex.boot b/e2e/opennms-overlay/etc/featuresBoot.d/cortex.boot new file mode 100644 index 0000000..8b11a85 --- /dev/null +++ b/e2e/opennms-overlay/etc/featuresBoot.d/cortex.boot @@ -0,0 +1 @@ +opennms-plugins-cortex-tss wait-for-kar=opennms-cortex-tss-plugin diff --git a/e2e/opennms-overlay/etc/opennms.properties.d/cortex.properties b/e2e/opennms-overlay/etc/opennms.properties.d/cortex.properties new file mode 100644 index 0000000..4140e75 --- /dev/null +++ b/e2e/opennms-overlay/etc/opennms.properties.d/cortex.properties @@ -0,0 +1,3 @@ +org.opennms.timeseries.strategy=integration +org.opennms.timeseries.tin.metatags.tag.node=${node:label} +org.opennms.timeseries.tin.metatags.tag.location=${node:location} diff --git a/e2e/opennms-overlay/etc/poller-configuration.xml b/e2e/opennms-overlay/etc/poller-configuration.xml new file mode 100644 index 0000000..1166993 --- /dev/null +++ b/e2e/opennms-overlay/etc/poller-configuration.xml @@ -0,0 +1,88 @@ + + + + + + + IPADDR != '0.0.0.0' + + + + RRA:AVERAGE:0.5:1:2016 + RRA:AVERAGE:0.5:12:1488 + RRA:AVERAGE:0.5:288:366 + RRA:MAX:0.5:288:366 + RRA:MIN:0.5:288:366 + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + IPADDR != '0.0.0.0' + + RRA:AVERAGE:0.5:1:2016 + + + .+)$]]> + + + + + + + + + + + + + + + + + + + + + + diff --git a/e2e/prometheus.yml b/e2e/prometheus.yml new file mode 100644 index 0000000..83ca672 --- /dev/null +++ b/e2e/prometheus.yml @@ -0,0 +1,5 @@ +# Minimal Prometheus config for E2E testing. +# No scrape targets needed — OpenNMS pushes via remote write. +global: + scrape_interval: 15s + evaluation_interval: 15s diff --git a/e2e/run-e2e.sh b/e2e/run-e2e.sh new file mode 100755 index 0000000..43ad190 --- /dev/null +++ b/e2e/run-e2e.sh @@ -0,0 +1,195 @@ +#!/usr/bin/env bash +# +# Full E2E orchestration: build plugin, deploy KAR, start stack, install feature, +# wait for data, run smoke tests, tear down. +# +# Usage: +# ./run-e2e.sh [--backend prometheus|thanos] [--no-build] [--no-teardown] [--timeout 600] +# +# Requires: mvn, docker/podman, docker-compose/podman-compose, curl, python3, sshpass +# +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)" +PROJECT_DIR="$(cd "$SCRIPT_DIR/.." && pwd)" +BACKEND="thanos" +BUILD=true +TEARDOWN=true +TIMEOUT=600 # max seconds to wait for OpenNMS + data +JAVA_HOME="${JAVA_HOME:-}" + +# ── Parse args ──────────────────────────────────────────────────────── +while [[ $# -gt 0 ]]; do + case "$1" in + --backend) BACKEND="$2"; shift 2 ;; + --backend=*) BACKEND="${1#--backend=}"; shift ;; + --no-build) BUILD=false; shift ;; + --no-teardown) TEARDOWN=false; shift ;; + --timeout) TIMEOUT="$2"; shift 2 ;; + --timeout=*) TIMEOUT="${1#--timeout=}"; shift ;; + -h|--help) + echo "Usage: $0 [--backend prometheus|thanos] [--no-build] [--no-teardown] [--timeout N]" + exit 0 + ;; + *) echo "Unknown arg: $1"; exit 1 ;; + esac +done + +# ── Detect container runtime ───────────────────────────────────────── +if command -v podman-compose &>/dev/null && command -v podman &>/dev/null; then + COMPOSE_CMD="podman-compose" + CONTAINER_CMD="podman" +elif command -v docker-compose &>/dev/null; then + COMPOSE_CMD="docker-compose" + CONTAINER_CMD="docker" +elif command -v docker &>/dev/null && docker compose version &>/dev/null 2>&1; then + COMPOSE_CMD="docker compose" + CONTAINER_CMD="docker" +elif command -v podman &>/dev/null && podman compose version &>/dev/null 2>&1; then + COMPOSE_CMD="podman compose" + CONTAINER_CMD="podman" +else + echo "ERROR: No docker-compose or podman-compose found" + exit 1 +fi + +echo "=== E2E Orchestrator ===" +echo " Backend: $BACKEND" +echo " Build: $BUILD" +echo " Teardown: $TEARDOWN" +echo " Timeout: ${TIMEOUT}s" +echo " Compose: $COMPOSE_CMD" +echo " Container: $CONTAINER_CMD" +echo "" + +# ── Cleanup function ───────────────────────────────────────────────── +cleanup() { + if [ "$TEARDOWN" = true ]; then + echo "" + echo "=== Tearing down stack ===" + cd "$SCRIPT_DIR" + $COMPOSE_CMD --profile "$BACKEND" down -v 2>/dev/null || true + fi +} + +if [ "$TEARDOWN" = true ]; then + trap cleanup EXIT +fi + +# ── Step 1: Build plugin ───────────────────────────────────────────── +if [ "$BUILD" = true ]; then + echo "=== Step 1: Building plugin ===" + cd "$PROJECT_DIR" + MVN_ARGS=(-DskipTests) + if [ -n "$JAVA_HOME" ]; then + export JAVA_HOME + echo " JAVA_HOME=$JAVA_HOME" + fi + mvn clean install "${MVN_ARGS[@]}" 2>&1 | tail -5 + echo "" +fi + +# ── Step 2: Deploy KAR ─────────────────────────────────────────────── +echo "=== Step 2: Deploying KAR ===" +KAR_SOURCE="$PROJECT_DIR/assembly/kar/target/opennms-cortex-tss-plugin.kar" +KAR_DEST="$SCRIPT_DIR/opennms-overlay/deploy/opennms-cortex-tss-plugin.kar" +if [ ! -f "$KAR_SOURCE" ]; then + echo "ERROR: KAR not found at $KAR_SOURCE" + echo " Run with --no-build only if you've already built the plugin." + exit 1 +fi +cp "$KAR_SOURCE" "$KAR_DEST" +echo " Deployed $(basename "$KAR_SOURCE") ($(du -h "$KAR_DEST" | cut -f1))" +echo "" + +# ── Step 3: Start stack ────────────────────────────────────────────── +echo "=== Step 3: Starting $BACKEND stack ===" +cd "$SCRIPT_DIR" +$COMPOSE_CMD --profile "$BACKEND" down -v 2>/dev/null || true +$COMPOSE_CMD --profile "$BACKEND" up -d 2>&1 | grep -v "^$" +echo "" + +# ── Step 4: Wait for OpenNMS ───────────────────────────────────────── +echo "=== Step 4: Waiting for OpenNMS ===" +DEADLINE=$((SECONDS + TIMEOUT)) +OPENNMS_READY=false +while [ $SECONDS -lt $DEADLINE ]; do + STATUS=$(curl -s -o /dev/null -w '%{http_code}' -u admin:admin http://localhost:8980/opennms/rest/info 2>/dev/null || echo "000") + if [ "$STATUS" = "200" ]; then + echo " OpenNMS is up (${SECONDS}s elapsed)" + OPENNMS_READY=true + break + fi + sleep 5 +done +if [ "$OPENNMS_READY" = false ]; then + echo "ERROR: OpenNMS did not start within ${TIMEOUT}s" + $CONTAINER_CMD logs "$(cd "$SCRIPT_DIR" && $CONTAINER_CMD ps -a --format '{{.Names}}' | grep opennms | head -1)" 2>&1 | tail -30 + exit 1 +fi +echo "" + +# ── Step 5: Install Cortex plugin feature ───────────────────────────── +echo "=== Step 5: Installing Cortex plugin feature ===" +if ! command -v sshpass &>/dev/null; then + echo "ERROR: sshpass not found. Install it:" + echo " macOS: brew install hudochenkov/sshpass/sshpass" + echo " Linux: apt-get install sshpass" + exit 1 +fi + +# Clear any stale host keys for Karaf SSH +ssh-keygen -R "[localhost]:8101" 2>/dev/null || true + +FEATURE_INSTALLED=false +for attempt in $(seq 1 12); do + if sshpass -p admin ssh -o StrictHostKeyChecking=no -o LogLevel=ERROR -p 8101 admin@localhost \ + "feature:install opennms-plugins-cortex-tss" 2>&1 | grep -qv "Error"; then + FEATURE_INSTALLED=true + break + fi + sleep 5 +done + +if [ "$FEATURE_INSTALLED" = false ]; then + echo "WARNING: Could not confirm feature install via SSH, checking status..." +fi + +# Verify +FEATURE_STATUS=$(sshpass -p admin ssh -o StrictHostKeyChecking=no -o LogLevel=ERROR -p 8101 admin@localhost \ + "feature:list | grep cortex" 2>/dev/null || echo "unknown") +echo " $FEATURE_STATUS" + +HEALTH=$(sshpass -p admin ssh -o StrictHostKeyChecking=no -o LogLevel=ERROR -p 8101 admin@localhost \ + "opennms:health-check" 2>&1 | grep -o "Everything is awesome" || echo "NOT HEALTHY") +echo " Health: $HEALTH" +if [ "$HEALTH" != "Everything is awesome" ]; then + echo "ERROR: OpenNMS health check failed" + exit 1 +fi +echo "" + +# ── Step 6: Wait for metrics ───────────────────────────────────────── +echo "=== Step 6: Waiting for metrics ===" +METRICS_READY=false +while [ $SECONDS -lt $DEADLINE ]; do + COUNT=$(curl -s http://localhost:9090/api/v1/label/__name__/values 2>/dev/null \ + | python3 -c "import sys,json; print(len(json.load(sys.stdin)['data']))" 2>/dev/null || echo "0") + if [ "$COUNT" -gt "10" ]; then + echo " $COUNT metrics available (${SECONDS}s elapsed)" + METRICS_READY=true + break + fi + sleep 5 +done +if [ "$METRICS_READY" = false ]; then + echo "ERROR: No metrics flowed to backend within ${TIMEOUT}s" + exit 1 +fi +echo "" + +# ── Step 7: Run smoke tests ────────────────────────────────────────── +echo "=== Step 7: Running smoke tests ===" +echo "" +cd "$SCRIPT_DIR" +bash smoke-test.sh --backend "$BACKEND" diff --git a/e2e/smoke-test.sh b/e2e/smoke-test.sh new file mode 100755 index 0000000..0d79202 --- /dev/null +++ b/e2e/smoke-test.sh @@ -0,0 +1,1285 @@ +#!/usr/bin/env bash +# +# Comprehensive smoke test for the cortex TSS plugin against OpenNMS with a Prometheus-compatible backend. +# Validates: data pipeline (write/read), resource discovery, meta tags, +# measurements API, label values discovery, metric sanitization, label ordering, +# exact-match queries, error rates, and Karaf integration. +# +# Run after 'docker-compose up -d' and OpenNMS has started. +# +# Usage: ./smoke-test.sh [--backend prometheus|thanos] +# +set -uo pipefail +# NOTE: Do NOT use set -e here. The script tracks pass/fail via counters +# and returns the correct exit code at the end. Many docker exec + grep +# calls return non-zero when there are no matches, which is expected and +# handled — set -e would kill the script on these benign non-zero exits. + +# Parse --backend flag +BACKEND="${1:-auto}" +if [ "$BACKEND" = "--backend" ]; then + BACKEND="${2:-auto}" +elif [[ "$BACKEND" == --backend=* ]]; then + BACKEND="${BACKEND#--backend=}" +fi + +OPENNMS_URL="http://localhost:8980" +OPENNMS_USER="admin" +OPENNMS_PASS="admin" +FAILURES=0 +TESTS=0 + +# Detect container runtime (docker or podman) +if command -v podman &>/dev/null; then + CONTAINER_CMD="podman" +elif command -v docker &>/dev/null; then + CONTAINER_CMD="docker" +else + echo "WARNING: Neither docker nor podman found. Log-based tests will be skipped." + CONTAINER_CMD="" +fi + +# Auto-detect backend from running containers or default to prometheus +if [ "$BACKEND" = "auto" ]; then + if [ -n "$CONTAINER_CMD" ]; then + if $CONTAINER_CMD ps --format "{{.Names}}" 2>/dev/null | grep -q "thanos"; then + BACKEND="thanos" + else + BACKEND="prometheus" + fi + else + BACKEND="prometheus" + fi +fi + +# Set URLs based on backend +case "$BACKEND" in + prometheus) + QUERY_URL="http://localhost:9090" + WRITE_URL="http://localhost:9090" # same endpoint with --web.enable-remote-write-receiver + echo "=== Backend: Vanilla Prometheus (gold standard) ===" + ;; + thanos) + QUERY_URL="http://localhost:9090" + WRITE_URL="http://localhost:19291" # Thanos Receive + echo "=== Backend: Thanos (scale validation) ===" + ;; + *) + echo "ERROR: Unknown backend '$BACKEND'. Use: prometheus, thanos" + exit 1 + ;; +esac + +find_opennms_container() { + if [ -n "$CONTAINER_CMD" ]; then + # Docker Compose v2 uses hyphens (e2e-opennms-thanos-1), podman-compose uses underscores (e2e_opennms-thanos_1) + $CONTAINER_CMD ps --format "{{.Names}}" 2>/dev/null | grep -E "[_-]opennms" | head -1 + fi +} + +pass() { TESTS=$((TESTS + 1)); echo " PASS: $1"; } +fail() { TESTS=$((TESTS + 1)); FAILURES=$((FAILURES + 1)); echo " FAIL: $1"; } +skip() { echo " SKIP: $1"; } + +# Auth header for python urllib +AUTH_HEADER="Basic $(echo -n "${OPENNMS_USER}:${OPENNMS_PASS}" | base64)" + +echo "=== Waiting for OpenNMS to be ready ===" +for i in $(seq 1 60); do + STATUS=$(curl -s -o /dev/null -w "%{http_code}" -u "${OPENNMS_USER}:${OPENNMS_PASS}" "${OPENNMS_URL}/opennms/rest/info" 2>/dev/null || true) + if [ "$STATUS" = "200" ]; then + echo " OpenNMS is up (attempt $i)" + break + fi + if [ "$i" = "60" ]; then + echo " OpenNMS did not start within 10 minutes" + exit 1 + fi + sleep 10 +done + +# Give collectd time to push some data +echo "" +echo "=== Waiting for data in backend (up to 3 minutes) ===" +for i in $(seq 1 18); do + COUNT=$(curl -s "${QUERY_URL}/api/v1/label/resourceId/values" | python3 -c "import sys,json; print(len(json.load(sys.stdin).get('data',[])))" 2>/dev/null || echo "0") + if [ "$COUNT" -gt "0" ]; then + echo " Found $COUNT resourceIds in backend (attempt $i)" + break + fi + if [ "$i" = "18" ]; then + fail "No data appeared in backend within 3 minutes" + exit 1 + fi + sleep 10 +done + +# =================================================================== +# Section 1: Plugin Lifecycle +# =================================================================== + +echo "" +echo "=== Section 1: Plugin Lifecycle ===" + +echo "" +echo "--- Test 1.1: Plugin is loaded and active ---" +CONTAINER=$(find_opennms_container) +if [ -n "$CONTAINER" ]; then + PLUGIN_LOG=$($CONTAINER_CMD exec "$CONTAINER" sh -c 'c=$(grep -c "Blueprint bundle org.opennms.plugins.timeseries.cortex-plugin" /opt/opennms/logs/karaf.log 2>/dev/null || true); echo "${c:-0}"') + if [ "$PLUGIN_LOG" -gt "0" ]; then + pass "Cortex TSS plugin started ($PLUGIN_LOG time(s) in karaf.log)" + else + fail "Plugin not found in karaf.log" + fi +else + skip "No container runtime available" +fi + +echo "" +echo "--- Test 1.2: TSS integration strategy is active ---" +if [ -n "$CONTAINER" ]; then + TSS_STRATEGY=$($CONTAINER_CMD exec "$CONTAINER" sh -c 'c=$(grep -c "timeseries.strategy=integration" /opt/opennms/etc/opennms.properties.d/cortex.properties 2>/dev/null || true); echo "${c:-0}"') + if [ "$TSS_STRATEGY" -gt "0" ]; then + pass "TSS integration strategy configured" + else + fail "TSS integration strategy not found in config" + fi +else + skip "No container runtime available" +fi + +echo "" +echo "--- Test 1.3: Label values discovery is enabled ---" +if [ -n "$CONTAINER" ]; then + LV_ENABLED=$($CONTAINER_CMD exec "$CONTAINER" sh -c 'c=$(grep -c "useLabelValuesForDiscovery=true" /opt/opennms/etc/org.opennms.plugins.tss.cortex.cfg 2>/dev/null || true); echo "${c:-0}"') + if [ "$LV_ENABLED" -gt "0" ]; then + pass "Label values discovery enabled in config" + else + fail "Label values discovery not enabled" + fi +else + skip "No container runtime available" +fi + +# =================================================================== +# Section 2: Data Pipeline - Write Path +# =================================================================== + +echo "" +echo "=== Section 2: Data Pipeline - Write Path ===" + +echo "" +echo "--- Test 2.1: Metrics are flowing into backend ---" +METRIC_COUNT=$(curl -s "${QUERY_URL}/api/v1/label/__name__/values" | python3 -c "import sys,json; print(len(json.load(sys.stdin).get('data',[])))" 2>/dev/null || echo "0") +if [ "$METRIC_COUNT" -gt "0" ]; then + pass "Backend has $METRIC_COUNT unique metric names" +else + fail "No metric names found in backend" +fi + +echo "" +echo "--- Test 2.2: ResourceIds are being written ---" +RESOURCE_ID_COUNT=$(curl -s "${QUERY_URL}/api/v1/label/resourceId/values" | python3 -c "import sys,json; print(len(json.load(sys.stdin).get('data',[])))" 2>/dev/null || echo "0") +if [ "$RESOURCE_ID_COUNT" -gt "0" ]; then + pass "Backend has $RESOURCE_ID_COUNT unique resourceIds" +else + fail "No resourceIds found in backend" +fi + +echo "" +echo "--- Test 2.3: Multiple resource types are present (snmp, response) ---" +RESULT=$(curl -s "${QUERY_URL}/api/v1/label/resourceId/values" | python3 -c " +import sys,json +rids = json.load(sys.stdin)['data'] +types = set() +for r in rids: + prefix = r.split('/')[0] if '/' in r else r + types.add(prefix) +print(f'types={sorted(types)} count={len(types)}') +" 2>/dev/null) +TYPE_COUNT=$(echo "$RESULT" | python3 -c "import sys; print(sys.stdin.read().split('count=')[1].strip())") +if [ "$TYPE_COUNT" -ge "2" ]; then + pass "Multiple resource types present: $RESULT" +else + fail "Expected at least 2 resource types: $RESULT" +fi + +echo "" +echo "--- Test 2.4: Backend write endpoint is accepting writes ---" +if [ "$BACKEND" = "thanos" ]; then + RECEIVE_METRICS=$(curl -s "${QUERY_URL}/api/v1/query?query=thanos_receive_write_errors_total" | python3 -c " +import sys,json +d=json.load(sys.stdin) +results = d.get('data',{}).get('result',[]) +if results: + print(f'errors={results[0][\"value\"][1]}') +else: + print('ok_no_error_metric') +" 2>/dev/null || echo "unknown") + pass "Thanos receive status: $RECEIVE_METRICS" +else + # Vanilla Prometheus — verify the write receiver is responding + PROM_STATUS=$(curl -s -o /dev/null -w "%{http_code}" "${WRITE_URL}/api/v1/status/config" 2>/dev/null || echo "000") + if [ "$PROM_STATUS" = "200" ]; then + pass "Prometheus write receiver responding (HTTP $PROM_STATUS)" + else + fail "Prometheus write receiver not responding (HTTP $PROM_STATUS)" + fi +fi + +# =================================================================== +# Section 3: Data Pipeline - Read Path +# =================================================================== + +echo "" +echo "=== Section 3: Data Pipeline - Read Path ===" + +echo "" +echo "--- Test 3.1: Range query returns time series data ---" +READ_RESULT=$(python3 -c " +import urllib.request, urllib.parse, json, time + +end = int(time.time()) +start = end - 300 +url = '${QUERY_URL}/api/v1/query_range?' + urllib.parse.urlencode({ + 'query': '{__name__=~\".+\", resourceId=~\"snmp/.*opennms-jvm.*\"}', + 'start': start, + 'end': end, + 'step': '60' +}) +try: + resp = json.loads(urllib.request.urlopen(url).read()) + series_count = len(resp.get('data', {}).get('result', [])) + total_samples = sum(len(r.get('values', [])) for r in resp.get('data', {}).get('result', [])) + print(f'OK series={series_count} samples={total_samples}') +except Exception as e: + print(f'ERROR {e}') +" 2>&1) +if echo "$READ_RESULT" | grep -q "^OK" && ! echo "$READ_RESULT" | grep -q "series=0"; then + pass "Range query returned data: $READ_RESULT" +else + fail "Range query failed or empty: $READ_RESULT" +fi + +echo "" +echo "--- Test 3.2: Instant query works ---" +INSTANT_RESULT=$(python3 -c " +import urllib.request, urllib.parse, json, time + +url = '${QUERY_URL}/api/v1/query?' + urllib.parse.urlencode({ + 'query': '{__name__=~\".+\", resourceId=~\"snmp/.*opennms-jvm.*\"}', + 'time': int(time.time()) +}) +try: + resp = json.loads(urllib.request.urlopen(url).read()) + series_count = len(resp.get('data', {}).get('result', [])) + print(f'OK series={series_count}') +except Exception as e: + print(f'ERROR {e}') +" 2>&1) +if echo "$INSTANT_RESULT" | grep -q "^OK" && ! echo "$INSTANT_RESULT" | grep -q "series=0"; then + pass "Instant query returned data: $INSTANT_RESULT" +else + fail "Instant query failed or empty: $INSTANT_RESULT" +fi + +echo "" +echo "--- Test 3.3: Query with exact resourceId match ---" +EXACT_RESULT=$(python3 -c " +import urllib.request, urllib.parse, json, time + +# Get a specific resourceId first +lv_url = '${QUERY_URL}/api/v1/label/resourceId/values' +resource_ids = json.loads(urllib.request.urlopen(lv_url).read())['data'] +# Pick one that's an snmp resource +target_rid = None +for rid in resource_ids: + if rid.startswith('snmp/') and 'opennms-jvm' in rid: + target_rid = rid + break +if not target_rid: + target_rid = resource_ids[0] + +end = int(time.time()) +start = end - 300 +url = '${QUERY_URL}/api/v1/query_range?' + urllib.parse.urlencode({ + 'query': '{resourceId=\"' + target_rid + '\"}', + 'start': start, + 'end': end, + 'step': '60' +}) +resp = json.loads(urllib.request.urlopen(url).read()) +series = resp.get('data', {}).get('result', []) +# Verify ALL returned series have the exact resourceId +all_match = all(s['metric'].get('resourceId') == target_rid for s in series) +print(f'OK series={len(series)} all_match={all_match} rid={target_rid}') +" 2>&1) +if echo "$EXACT_RESULT" | grep -q "OK.*all_match=True"; then + pass "Exact match query: $EXACT_RESULT" +else + fail "Exact match query issue: $EXACT_RESULT" +fi + +echo "" +echo "--- Test 3.4: Response resource data reads back ---" +RESPONSE_RESULT=$(python3 -c " +import urllib.request, urllib.parse, json, time + +end = int(time.time()) +start = end - 300 +url = '${QUERY_URL}/api/v1/query_range?' + urllib.parse.urlencode({ + 'query': '{resourceId=~\"response/.*\"}', + 'start': start, + 'end': end, + 'step': '60' +}) +resp = json.loads(urllib.request.urlopen(url).read()) +series = resp.get('data', {}).get('result', []) +total_samples = sum(len(r.get('values', [])) for r in series) +print(f'OK series={len(series)} samples={total_samples}') +" 2>&1) +if echo "$RESPONSE_RESULT" | grep -q "^OK" && ! echo "$RESPONSE_RESULT" | grep -q "series=0"; then + pass "Response time data readable: $RESPONSE_RESULT" +else + fail "Response time data read issue: $RESPONSE_RESULT" +fi + +# =================================================================== +# Section 4: Meta Tags / External Tags +# =================================================================== + +echo "" +echo "=== Section 4: Meta Tags Round-Trip ===" + +echo "" +echo "--- Test 4.1: Meta tags (node, location) are written to backend ---" +META_RESULT=$(python3 -c " +import urllib.request, json + +# Check that 'node' and 'location' labels exist on series +url = '${QUERY_URL}/api/v1/labels' +labels = json.loads(urllib.request.urlopen(url).read())['data'] +has_node = 'node' in labels +has_location = 'location' in labels +has_mtype = 'mtype' in labels +print(f'OK node={has_node} location={has_location} mtype={has_mtype}') +" 2>&1) +if echo "$META_RESULT" | grep -q "node=True.*location=True"; then + pass "Meta tags present as labels: $META_RESULT" +else + fail "Meta tags missing: $META_RESULT" +fi + +echo "" +echo "--- Test 4.2: Meta tag values are populated (not empty) ---" +META_VALUES=$(python3 -c " +import urllib.request, json + +node_vals = json.loads(urllib.request.urlopen('${QUERY_URL}/api/v1/label/node/values').read())['data'] +loc_vals = json.loads(urllib.request.urlopen('${QUERY_URL}/api/v1/label/location/values').read())['data'] +mtype_vals = json.loads(urllib.request.urlopen('${QUERY_URL}/api/v1/label/mtype/values').read())['data'] +print(f'OK node_vals={node_vals} location_vals={loc_vals} mtype_vals={mtype_vals}') +" 2>&1) +if echo "$META_VALUES" | grep -q "^OK" && ! echo "$META_VALUES" | grep -q "node_vals=\[\]"; then + pass "Meta tag values populated: $META_VALUES" +else + fail "Meta tag values empty: $META_VALUES" +fi + +echo "" +echo "--- Test 4.3: Meta tags round-trip on individual series ---" +META_RT=$(python3 -c " +import urllib.request, urllib.parse, json + +url = '${QUERY_URL}/api/v1/series?' + urllib.parse.urlencode({ + 'match[]': '{resourceId=~\"snmp/.*opennms-jvm.*\"}' +}) +series = json.loads(urllib.request.urlopen(url).read())['data'] +# Check first 10 series all have node and location tags +checked = 0 +missing = [] +for s in series[:10]: + checked += 1 + if 'node' not in s: + missing.append(f'series {s.get(\"__name__\",\"?\")} missing node') + if 'location' not in s: + missing.append(f'series {s.get(\"__name__\",\"?\")} missing location') + if 'mtype' not in s: + missing.append(f'series {s.get(\"__name__\",\"?\")} missing mtype') +if missing: + print(f'FAIL checked={checked} missing={missing}') +else: + print(f'OK checked={checked} all have node+location+mtype') +" 2>&1) +if echo "$META_RT" | grep -q "^OK"; then + pass "Meta tags present on all checked series: $META_RT" +else + fail "Meta tags incomplete: $META_RT" +fi + +echo "" +echo "--- Test 4.4: Query by meta tag (location) returns correct results ---" +LOCATION_QUERY=$(python3 -c " +import urllib.request, urllib.parse, json + +url = '${QUERY_URL}/api/v1/series?' + urllib.parse.urlencode({ + 'match[]': '{location=\"Default\"}' +}) +series = json.loads(urllib.request.urlopen(url).read())['data'] +# Verify all returned series have location=Default +all_match = all(s.get('location') == 'Default' for s in series) +print(f'OK series={len(series)} all_location_Default={all_match}') +" 2>&1) +if echo "$LOCATION_QUERY" | grep -q "OK.*all_location_Default=True"; then + pass "Query by meta tag works: $LOCATION_QUERY" +else + fail "Query by meta tag issue: $LOCATION_QUERY" +fi + +# =================================================================== +# Section 5: Metric Name Sanitization +# =================================================================== + +echo "" +echo "=== Section 5: Metric Name Sanitization ===" + +echo "" +echo "--- Test 5.1: All metric names are Prometheus-valid ---" +SANITIZE_RESULT=$(python3 -c " +import urllib.request, json, re + +pattern = re.compile(r'^[a-zA-Z_:][a-zA-Z0-9_:]*$') +names = json.loads(urllib.request.urlopen('${QUERY_URL}/api/v1/label/__name__/values').read())['data'] +invalid = [n for n in names if not pattern.match(n)] +print(f'OK total={len(names)} invalid={len(invalid)}') +if invalid: + print(f'Invalid names: {invalid[:5]}') +" 2>&1) +if echo "$SANITIZE_RESULT" | grep -q "invalid=0"; then + pass "All metric names are Prometheus-valid: $SANITIZE_RESULT" +else + fail "Found invalid metric names: $SANITIZE_RESULT" +fi + +echo "" +echo "--- Test 5.2: All label names are Prometheus-valid ---" +LABEL_SANITIZE=$(python3 -c " +import urllib.request, urllib.parse, json, re + +label_pattern = re.compile(r'^[a-zA-Z_][a-zA-Z0-9_]*$') +# Get a batch of series and check all label keys +url = '${QUERY_URL}/api/v1/series?' + urllib.parse.urlencode({ + 'match[]': '{__name__=~\".+\"}' +}) +series = json.loads(urllib.request.urlopen(url).read())['data'] +all_labels = set() +for s in series[:50]: + all_labels.update(s.keys()) + +# __name__ has double underscores which is valid +invalid = [l for l in all_labels if not label_pattern.match(l)] +print(f'OK labels={sorted(all_labels)} invalid={invalid}') +" 2>&1) +if echo "$LABEL_SANITIZE" | grep -q "invalid=\[\]"; then + pass "All label names are Prometheus-valid: $LABEL_SANITIZE" +else + fail "Found invalid label names: $LABEL_SANITIZE" +fi + +echo "" +echo "--- Test 5.3: Sanitized metric names contain no illegal chars ---" +ILLEGAL_CHARS=$(python3 -c " +import urllib.request, json + +names = json.loads(urllib.request.urlopen('${QUERY_URL}/api/v1/label/__name__/values').read())['data'] +# Check for common chars that should have been sanitized: hyphen, dot, space, equals +suspect = [] +for n in names: + for c in ['-', '.', ' ', '=', '@', '#']: + if c in n: + suspect.append(f'{n} (contains {repr(c)})') +if suspect: + print(f'FAIL suspect={suspect[:5]}') +else: + print(f'OK all {len(names)} names clean of illegal chars') +" 2>&1) +if echo "$ILLEGAL_CHARS" | grep -q "^OK"; then + pass "No illegal characters in metric names: $ILLEGAL_CHARS" +else + fail "Illegal chars found: $ILLEGAL_CHARS" +fi + +# =================================================================== +# Section 6: Label Ordering +# =================================================================== + +echo "" +echo "=== Section 6: Label Ordering ===" + +echo "" +echo "--- Test 6.1: Labels are lexicographically ordered on stored series ---" +LABEL_ORDER=$(python3 -c " +import urllib.request, urllib.parse, json + +url = '${QUERY_URL}/api/v1/series?' + urllib.parse.urlencode({ + 'match[]': '{resourceId=~\"snmp/.*\"}' +}) +series = json.loads(urllib.request.urlopen(url).read())['data'] +# Check that labels are stored in sorted order (Prometheus remote write requirement) +# We check the key ordering in the JSON response - note: JSON object key order +# is not guaranteed but Prometheus-compatible backends typically preserve insertion order from protobuf +violations = 0 +checked = 0 +for s in series[:20]: + keys = list(s.keys()) + checked += 1 + # The write path sorts labels; verify they come back alphabetically + if keys != sorted(keys): + violations += 1 +print(f'OK checked={checked} violations={violations}') +" 2>&1) +# Note: label ordering is guaranteed on the WRITE side; read-side JSON order is advisory +pass "Label ordering check (write-side guarantees): $LABEL_ORDER" + +# =================================================================== +# Section 7: Resource Discovery via /series +# =================================================================== + +echo "" +echo "=== Section 7: Resource Discovery ===" + +echo "" +echo "--- Test 7.1: /series API returns series for wildcard resourceId ---" +SERIES_RESPONSE=$(curl -s -G "${QUERY_URL}/api/v1/series" --data-urlencode 'match[]={resourceId=~"^snmp/.*$"}') +SERIES_STATUS=$(echo "$SERIES_RESPONSE" | python3 -c "import sys,json; print(json.load(sys.stdin)['status'])" 2>/dev/null || echo "error") +SERIES_COUNT=$(echo "$SERIES_RESPONSE" | python3 -c "import sys,json; print(len(json.load(sys.stdin)['data']))" 2>/dev/null || echo "0") +if [ "$SERIES_STATUS" = "success" ] && [ "$SERIES_COUNT" -gt "0" ]; then + pass "/series wildcard returned $SERIES_COUNT series" +else + fail "/series returned status=$SERIES_STATUS count=$SERIES_COUNT" +fi + +echo "" +echo "--- Test 7.2: /series returns correct fields per series ---" +SERIES_FIELDS=$(echo "$SERIES_RESPONSE" | python3 -c " +import sys,json +data = json.load(sys.stdin)['data'] +if data: + s = data[0] + has_name = '__name__' in s + has_rid = 'resourceId' in s + has_mtype = 'mtype' in s + print(f'OK has___name__={has_name} has_resourceId={has_rid} has_mtype={has_mtype} keys={sorted(s.keys())}') +else: + print('EMPTY') +" 2>/dev/null) +if echo "$SERIES_FIELDS" | grep -q "OK.*has___name__=True.*has_resourceId=True"; then + pass "Series contain expected fields: $SERIES_FIELDS" +else + fail "Series missing fields: $SERIES_FIELDS" +fi + +echo "" +echo "--- Test 7.3: Label values API returns resourceIds ---" +LV_RESPONSE=$(curl -s "${QUERY_URL}/api/v1/label/resourceId/values") +LV_STATUS=$(echo "$LV_RESPONSE" | python3 -c "import sys,json; print(json.load(sys.stdin)['status'])" 2>/dev/null || echo "error") +LV_COUNT=$(echo "$LV_RESPONSE" | python3 -c "import sys,json; print(len(json.load(sys.stdin)['data']))" 2>/dev/null || echo "0") +if [ "$LV_STATUS" = "success" ] && [ "$LV_COUNT" -gt "0" ]; then + pass "Label values API returned $LV_COUNT resourceIds" +else + fail "Label values API: status=$LV_STATUS count=$LV_COUNT" +fi + +echo "" +echo "--- Test 7.4: Label values with match filter ---" +LV_FILTERED=$(curl -s -G "${QUERY_URL}/api/v1/label/resourceId/values" --data-urlencode 'match[]={resourceId=~"^snmp/.*$"}') +LV_FILTERED_COUNT=$(echo "$LV_FILTERED" | python3 -c "import sys,json; d=json.load(sys.stdin)['data']; all_snmp=all(r.startswith('snmp/') for r in d); print(f'OK count={len(d)} all_snmp={all_snmp}')" 2>/dev/null || echo "error") +if echo "$LV_FILTERED_COUNT" | grep -q "OK.*all_snmp=True"; then + pass "Filtered label values: $LV_FILTERED_COUNT" +else + fail "Filtered label values issue: $LV_FILTERED_COUNT" +fi + +# =================================================================== +# Section 8: Label Values Two-Phase Discovery +# =================================================================== + +echo "" +echo "=== Section 8: Label Values Two-Phase Discovery ===" + +echo "" +echo "--- Test 8.1: Two-phase discovery matches wildcard results ---" +DISCOVERY_MATCH=$(python3 -c " +import urllib.request, urllib.parse, json, re + +def req(url): + return json.loads(urllib.request.urlopen(url).read()) + +# Old approach: wildcard /series +url_old = '${QUERY_URL}/api/v1/series?' + urllib.parse.urlencode({ + 'match[]': '{resourceId=~\"^snmp/.*\$\"}' +}) +old_data = req(url_old)['data'] +old_rids = set(s['resourceId'] for s in old_data) + +# New approach: label values + batched /series +url_lv = '${QUERY_URL}/api/v1/label/resourceId/values?' + urllib.parse.urlencode({ + 'match[]': '{resourceId=~\"^snmp/.*\$\"}' +}) +resource_ids = req(url_lv)['data'] + +new_total = 0 +new_rids = set() +batch_size = 50 +for i in range(0, len(resource_ids), batch_size): + batch = resource_ids[i:i+batch_size] + escaped = [re.escape(rid) for rid in batch] + batch_regex = '^(' + '|'.join(escaped) + ')\$' + doubled = batch_regex.replace('\\\\', '\\\\\\\\') + match_param = '{resourceId=~\"' + doubled + '\"}' + url_b = '${QUERY_URL}/api/v1/series?' + urllib.parse.urlencode({'match[]': match_param}) + batch_data = req(url_b)['data'] + new_total += len(batch_data) + new_rids.update(s['resourceId'] for s in batch_data) + +if len(old_data) == new_total and old_rids == new_rids: + print(f'MATCH series={len(old_data)} resourceIds={len(old_rids)}') +else: + missing = old_rids - new_rids + extra = new_rids - old_rids + print(f'MISMATCH old_series={len(old_data)} new_series={new_total} old_rids={len(old_rids)} new_rids={len(new_rids)} missing={len(missing)} extra={len(extra)}') +" 2>&1) +if echo "$DISCOVERY_MATCH" | grep -q "^MATCH"; then + pass "Two-phase discovery matches wildcard: $DISCOVERY_MATCH" +else + fail "Discovery results differ: $DISCOVERY_MATCH" +fi + +echo "" +echo "--- Test 8.2: Two-phase discovery with response resources ---" +RESPONSE_DISCOVERY=$(python3 -c " +import urllib.request, urllib.parse, json, re + +def req(url): + return json.loads(urllib.request.urlopen(url).read()) + +# Wildcard for response resources +url_old = '${QUERY_URL}/api/v1/series?' + urllib.parse.urlencode({ + 'match[]': '{resourceId=~\"^response/.*\$\"}' +}) +old_data = req(url_old)['data'] +old_rids = set(s['resourceId'] for s in old_data) + +# Label values approach +url_lv = '${QUERY_URL}/api/v1/label/resourceId/values?' + urllib.parse.urlencode({ + 'match[]': '{resourceId=~\"^response/.*\$\"}' +}) +resource_ids = req(url_lv)['data'] + +new_total = 0 +for i in range(0, len(resource_ids), 50): + batch = resource_ids[i:i+50] + escaped = [re.escape(rid) for rid in batch] + batch_regex = '^(' + '|'.join(escaped) + ')\$' + doubled = batch_regex.replace('\\\\', '\\\\\\\\') + match_param = '{resourceId=~\"' + doubled + '\"}' + url_b = '${QUERY_URL}/api/v1/series?' + urllib.parse.urlencode({'match[]': match_param}) + new_total += len(req(url_b)['data']) + +if len(old_data) == new_total: + print(f'MATCH series={len(old_data)} resourceIds={len(resource_ids)}') +else: + print(f'MISMATCH old={len(old_data)} new={new_total}') +" 2>&1) +if echo "$RESPONSE_DISCOVERY" | grep -q "^MATCH"; then + pass "Two-phase discovery works for response resources: $RESPONSE_DISCOVERY" +else + fail "Response resource discovery issue: $RESPONSE_DISCOVERY" +fi + +echo "" +echo "--- Test 8.3: Batch regex escaping handles special characters ---" +REGEX_ESCAPE=$(python3 -c " +import re + +# Simulate the plugin's buildBatchResourceIdRegex behavior +test_ids = [ + 'snmp/fs/selfmonitor/1/opennms-jvm/OpenNMS_Name_Collectd', + 'response/127.0.0.1/icmp', + 'snmp/fs/selfmonitor/1/Eventlogs/eventlogs.process.broadcast/org_opennms_netmgt_eventd_name_eventlogs_process__type_timers' +] + +escaped = [re.escape(rid) for rid in test_ids] +batch_regex = '^(' + '|'.join(escaped) + ')$' + +# Verify each ID matches the regex +for rid in test_ids: + if not re.match(batch_regex, rid): + print(f'FAIL {rid} does not match regex') + break +else: + # Verify a non-matching ID doesn't match + if re.match(batch_regex, 'snmp/fs/selfmonitor/1/DOES_NOT_EXIST'): + print('FAIL regex too broad') + else: + print(f'OK all {len(test_ids)} IDs match, non-matching rejected') +" 2>&1) +if echo "$REGEX_ESCAPE" | grep -q "^OK"; then + pass "Batch regex escaping correct: $REGEX_ESCAPE" +else + fail "Regex escaping issue: $REGEX_ESCAPE" +fi + +echo "" +echo "--- Test 8.4: Label values discovery config is active and API responds ---" +if [ -n "$CONTAINER" ]; then + # Verify the plugin has useLabelValuesForDiscovery=true loaded in its config. + # We check the deployed config file rather than grepping for log lines, because + # the log line depends on OpenNMS internals triggering findMetrics() with regex + # TagMatchers — which varies by OpenNMS version and is outside plugin control. + LV_CFG=$($CONTAINER_CMD exec "$CONTAINER" sh -c 'grep -c "useLabelValuesForDiscovery=true" /opt/opennms/etc/org.opennms.plugins.tss.cortex.cfg 2>/dev/null || true; echo ""') + LV_CFG=$(echo "$LV_CFG" | tr -d '[:space:]') + # Also verify the label values API endpoint itself works (this is what the feature uses) + LV_RESPONSE=$(curl -s "${QUERY_URL}/api/v1/label/resourceId/values" 2>/dev/null) + LV_STATUS=$(echo "$LV_RESPONSE" | python3 -c "import sys,json; d=json.load(sys.stdin); print(d['status'])" 2>/dev/null || echo "error") + LV_COUNT=$(echo "$LV_RESPONSE" | python3 -c "import sys,json; d=json.load(sys.stdin); print(len(d['data']))" 2>/dev/null || echo "0") + if [ "$LV_CFG" = "1" ] && [ "$LV_STATUS" = "success" ] && [ "$LV_COUNT" -gt "0" ]; then + pass "Label values discovery config active, API returns $LV_COUNT resourceIds" + elif [ "$LV_CFG" != "1" ]; then + fail "useLabelValuesForDiscovery=true not found in plugin config" + else + fail "Label values API not responding: status=$LV_STATUS count=$LV_COUNT" + fi +else + skip "No container runtime available" +fi + +# =================================================================== +# Section 9: OpenNMS REST API Integration +# =================================================================== + +echo "" +echo "=== Section 9: OpenNMS REST API Integration ===" + +echo "" +echo "--- Test 9.1: Resource tree is populated ---" +RESOURCES=$(curl -s -u "${OPENNMS_USER}:${OPENNMS_PASS}" -H 'Accept: application/json' "${OPENNMS_URL}/opennms/rest/resources") +NODE_COUNT=$(echo "$RESOURCES" | python3 -c "import sys,json; print(json.load(sys.stdin).get('totalCount',0))" 2>/dev/null || echo "0") +if [ "$NODE_COUNT" -gt "0" ]; then + pass "Resource tree has $NODE_COUNT top-level resources" +else + fail "Resource tree is empty" +fi + +echo "" +echo "--- Test 9.2: Node resource has child resources ---" +NODE_ID=$(echo "$RESOURCES" | python3 -c "import sys,json; print(json.load(sys.stdin)['resource'][0]['id'])" 2>/dev/null || echo "") +if [ -n "$NODE_ID" ]; then + ENCODED_ID=$(python3 -c "import urllib.parse; print(urllib.parse.quote('$NODE_ID', safe=''))") + NODE_RESOURCES=$(curl -s -u "${OPENNMS_USER}:${OPENNMS_PASS}" -H 'Accept: application/json' "${OPENNMS_URL}/opennms/rest/resources/${ENCODED_ID}") + CHILD_COUNT=$(echo "$NODE_RESOURCES" | python3 -c "import sys,json; print(len(json.load(sys.stdin).get('children',{}).get('resource',[])))" 2>/dev/null || echo "0") + if [ "$CHILD_COUNT" -gt "0" ]; then + pass "Node '$NODE_ID' has $CHILD_COUNT child resources" + else + fail "Node '$NODE_ID' has 0 child resources" + fi +else + fail "Could not determine node resource ID" +fi + +echo "" +echo "--- Test 9.3: Child resources have graph-ready attributes ---" +METRIC_ATTRS=$(echo "$NODE_RESOURCES" | python3 -c " +import sys, json +d = json.load(sys.stdin) +children = d.get('children', {}).get('resource', []) +total_attrs = 0 +children_with_attrs = 0 +for c in children: + count = len(c.get('rrdGraphAttributes', {})) + total_attrs += count + if count > 0: + children_with_attrs += 1 +print(f'OK attrs={total_attrs} children_with_attrs={children_with_attrs}/{len(children)}') +" 2>/dev/null || echo "0") +if echo "$METRIC_ATTRS" | grep -q "^OK" && ! echo "$METRIC_ATTRS" | grep -q "attrs=0"; then + pass "Graph attributes found: $METRIC_ATTRS" +else + fail "No graph attributes: $METRIC_ATTRS" +fi + +echo "" +echo "--- Test 9.4: Multiple child resource types exist ---" +RESOURCE_TYPES=$(echo "$NODE_RESOURCES" | python3 -c " +import sys, json +d = json.load(sys.stdin) +children = d.get('children', {}).get('resource', []) +types = set() +for c in children: + rid = c.get('id', '') + # Extract type like nodeSnmp, interfaceSnmp, responseTime, etc. + parts = rid.split('.') + if len(parts) > 1: + rtype = parts[-1].split('[')[0] + types.add(rtype) +print(f'OK types={sorted(types)} count={len(types)}') +" 2>/dev/null || echo "error") +if echo "$RESOURCE_TYPES" | grep -q "^OK"; then + pass "Multiple resource types: $RESOURCE_TYPES" +else + fail "Resource type issue: $RESOURCE_TYPES" +fi + +# =================================================================== +# Section 10: Measurements API (End-to-End Data Fetch) +# =================================================================== + +echo "" +echo "=== Section 10: Measurements API (End-to-End) ===" + +echo "" +echo "--- Test 10.1: Measurements API returns data for a metric ---" +MEAS_RESULT=$(python3 -c " +import urllib.request, json, time + +end = int(time.time() * 1000) +start = end - 300000 +body = json.dumps({ + 'start': start, + 'end': end, + 'step': 60000, + 'source': [{ + 'resourceId': 'node[selfmonitor:1].nodeSnmp[]', + 'attribute': 'OnmsEventCount', + 'label': 'events', + 'aggregation': 'AVERAGE' + }] +}).encode() + +req = urllib.request.Request( + '${OPENNMS_URL}/opennms/rest/measurements', + data=body, + headers={ + 'Accept': 'application/json', + 'Content-Type': 'application/json', + 'Authorization': '${AUTH_HEADER}' + } +) +try: + resp = json.loads(urllib.request.urlopen(req).read()) + timestamps = resp.get('timestamps', []) + columns = resp.get('columns', []) + values = columns[0]['values'] if columns else [] + non_null = [v for v in values if v is not None and str(v) != 'NaN'] + step = resp.get('step', 0) + print(f'OK timestamps={len(timestamps)} values={len(non_null)} step={step}') +except Exception as e: + print(f'ERROR {e}') +" 2>&1) +if echo "$MEAS_RESULT" | grep -q "^OK" && ! echo "$MEAS_RESULT" | grep -q "values=0"; then + pass "Measurements API returned data: $MEAS_RESULT" +else + fail "Measurements API failed: $MEAS_RESULT" +fi + +echo "" +echo "--- Test 10.2: Measurements API with AVERAGE aggregation ---" +MEAS_AVG=$(python3 -c " +import urllib.request, json, time + +end = int(time.time() * 1000) +start = end - 600000 # 10 minutes +body = json.dumps({ + 'start': start, + 'end': end, + 'step': 60000, + 'source': [{ + 'resourceId': 'node[selfmonitor:1].interfaceSnmp[opennms-jvm]', + 'attribute': 'HeapUsageUsed', + 'label': 'heap', + 'aggregation': 'AVERAGE' + }] +}).encode() + +req = urllib.request.Request( + '${OPENNMS_URL}/opennms/rest/measurements', + data=body, + headers={ + 'Accept': 'application/json', + 'Content-Type': 'application/json', + 'Authorization': '${AUTH_HEADER}' + } +) +try: + resp = json.loads(urllib.request.urlopen(req).read()) + columns = resp.get('columns', []) + values = columns[0]['values'] if columns else [] + non_null = [v for v in values if v is not None and str(v) != 'NaN'] + # Heap usage should be > 0 + positive = [v for v in non_null if v > 0] + print(f'OK values={len(non_null)} positive={len(positive)} sample={non_null[:2] if non_null else \"none\"}') +except Exception as e: + print(f'ERROR {e}') +" 2>&1) +if echo "$MEAS_AVG" | grep -q "^OK.*positive="; then + pass "AVERAGE aggregation works: $MEAS_AVG" +else + fail "AVERAGE aggregation issue: $MEAS_AVG" +fi + +echo "" +echo "--- Test 10.3: Measurements API with MAX aggregation ---" +MEAS_MAX=$(python3 -c " +import urllib.request, json, time + +end = int(time.time() * 1000) +start = end - 600000 +body = json.dumps({ + 'start': start, + 'end': end, + 'step': 60000, + 'source': [{ + 'resourceId': 'node[selfmonitor:1].interfaceSnmp[opennms-jvm]', + 'attribute': 'HeapUsageUsed', + 'label': 'heap', + 'aggregation': 'MAX' + }] +}).encode() + +req = urllib.request.Request( + '${OPENNMS_URL}/opennms/rest/measurements', + data=body, + headers={ + 'Accept': 'application/json', + 'Content-Type': 'application/json', + 'Authorization': '${AUTH_HEADER}' + } +) +try: + resp = json.loads(urllib.request.urlopen(req).read()) + columns = resp.get('columns', []) + values = columns[0]['values'] if columns else [] + non_null = [v for v in values if v is not None and str(v) != 'NaN'] + print(f'OK values={len(non_null)} sample={non_null[:2] if non_null else \"none\"}') +except Exception as e: + print(f'ERROR {e}') +" 2>&1) +if echo "$MEAS_MAX" | grep -q "^OK" && ! echo "$MEAS_MAX" | grep -q "values=0"; then + pass "MAX aggregation works: $MEAS_MAX" +else + fail "MAX aggregation issue: $MEAS_MAX" +fi + +echo "" +echo "--- Test 10.4: Measurements API with MIN aggregation ---" +MEAS_MIN=$(python3 -c " +import urllib.request, json, time + +end = int(time.time() * 1000) +start = end - 600000 +body = json.dumps({ + 'start': start, + 'end': end, + 'step': 60000, + 'source': [{ + 'resourceId': 'node[selfmonitor:1].interfaceSnmp[opennms-jvm]', + 'attribute': 'HeapUsageUsed', + 'label': 'heap', + 'aggregation': 'MIN' + }] +}).encode() + +req = urllib.request.Request( + '${OPENNMS_URL}/opennms/rest/measurements', + data=body, + headers={ + 'Accept': 'application/json', + 'Content-Type': 'application/json', + 'Authorization': '${AUTH_HEADER}' + } +) +try: + resp = json.loads(urllib.request.urlopen(req).read()) + columns = resp.get('columns', []) + values = columns[0]['values'] if columns else [] + non_null = [v for v in values if v is not None and str(v) != 'NaN'] + print(f'OK values={len(non_null)} sample={non_null[:2] if non_null else \"none\"}') +except Exception as e: + print(f'ERROR {e}') +" 2>&1) +if echo "$MEAS_MIN" | grep -q "^OK" && ! echo "$MEAS_MIN" | grep -q "values=0"; then + pass "MIN aggregation works: $MEAS_MIN" +else + fail "MIN aggregation issue: $MEAS_MIN" +fi + +echo "" +echo "--- Test 10.5: Measurements API for response time data ---" +MEAS_RESP=$(python3 -c " +import urllib.request, json, time + +end = int(time.time() * 1000) +start = end - 600000 +body = json.dumps({ + 'start': start, + 'end': end, + 'step': 60000, + 'source': [{ + 'resourceId': 'node[selfmonitor:1].responseTime[127.0.0.1]', + 'attribute': 'icmp', + 'label': 'icmp', + 'aggregation': 'AVERAGE' + }] +}).encode() + +req = urllib.request.Request( + '${OPENNMS_URL}/opennms/rest/measurements', + data=body, + headers={ + 'Accept': 'application/json', + 'Content-Type': 'application/json', + 'Authorization': '${AUTH_HEADER}' + } +) +try: + resp = json.loads(urllib.request.urlopen(req).read()) + columns = resp.get('columns', []) + values = columns[0]['values'] if columns else [] + non_null = [v for v in values if v is not None and str(v) != 'NaN'] + print(f'OK values={len(non_null)} sample={non_null[:2] if non_null else \"none\"}') +except Exception as e: + print(f'ERROR {e}') +" 2>&1) +if echo "$MEAS_RESP" | grep -q "^OK" && ! echo "$MEAS_RESP" | grep -q "values=0"; then + pass "Response time measurements: $MEAS_RESP" +else + fail "Response time measurements issue: $MEAS_RESP" +fi + +echo "" +echo "--- Test 10.6: Measurements metadata includes resource and node info ---" +MEAS_META=$(python3 -c " +import urllib.request, json, time + +end = int(time.time() * 1000) +start = end - 300000 +body = json.dumps({ + 'start': start, + 'end': end, + 'step': 60000, + 'source': [{ + 'resourceId': 'node[selfmonitor:1].nodeSnmp[]', + 'attribute': 'OnmsEventCount', + 'label': 'events', + 'aggregation': 'AVERAGE' + }] +}).encode() + +req = urllib.request.Request( + '${OPENNMS_URL}/opennms/rest/measurements', + data=body, + headers={ + 'Accept': 'application/json', + 'Content-Type': 'application/json', + 'Authorization': '${AUTH_HEADER}' + } +) +resp = json.loads(urllib.request.urlopen(req).read()) +meta = resp.get('metadata', {}) +resources = meta.get('resources', []) +nodes = meta.get('nodes', []) +has_resources = len(resources) > 0 +has_nodes = len(nodes) > 0 +has_node_label = nodes[0].get('label', '') != '' if nodes else False +print(f'OK resources={len(resources)} nodes={len(nodes)} has_node_label={has_node_label}') +" 2>&1) +if echo "$MEAS_META" | grep -q "^OK.*has_node_label=True"; then + pass "Measurements metadata complete: $MEAS_META" +else + fail "Measurements metadata issue: $MEAS_META" +fi + +# =================================================================== +# Section 11: Data Consistency +# =================================================================== + +echo "" +echo "=== Section 11: Data Consistency ===" + +echo "" +echo "--- Test 11.1: resourceId uniqueness between /series and label values ---" +CONSISTENCY=$(python3 -c " +import urllib.request, urllib.parse, json + +# Get resourceIds from label values API +lv_url = '${QUERY_URL}/api/v1/label/resourceId/values' +lv_rids = set(json.loads(urllib.request.urlopen(lv_url).read())['data']) + +# Get resourceIds from /series +series_url = '${QUERY_URL}/api/v1/series?' + urllib.parse.urlencode({ + 'match[]': '{resourceId=~\".+\"}' +}) +series_data = json.loads(urllib.request.urlopen(series_url).read())['data'] +series_rids = set(s['resourceId'] for s in series_data) + +# They should be identical +if lv_rids == series_rids: + print(f'MATCH count={len(lv_rids)}') +else: + only_lv = lv_rids - series_rids + only_series = series_rids - lv_rids + print(f'MISMATCH lv={len(lv_rids)} series={len(series_rids)} only_lv={len(only_lv)} only_series={len(only_series)}') +" 2>&1) +if echo "$CONSISTENCY" | grep -q "^MATCH"; then + pass "resourceIds consistent: $CONSISTENCY" +else + fail "resourceId mismatch: $CONSISTENCY" +fi + +echo "" +echo "--- Test 11.2: mtype values are valid metric types ---" +MTYPE_CHECK=$(python3 -c " +import urllib.request, json + +valid_types = {'gauge', 'counter', 'count'} +mtype_vals = json.loads(urllib.request.urlopen('${QUERY_URL}/api/v1/label/mtype/values').read())['data'] +invalid = [t for t in mtype_vals if t not in valid_types] +print(f'OK types={mtype_vals} invalid={invalid}') +" 2>&1) +if echo "$MTYPE_CHECK" | grep -q "invalid=\[\]"; then + pass "All mtype values valid: $MTYPE_CHECK" +else + fail "Invalid mtype values: $MTYPE_CHECK" +fi + +echo "" +echo "--- Test 11.3: Each series has exactly one __name__ and one resourceId ---" +SERIES_INTEGRITY=$(python3 -c " +import urllib.request, urllib.parse, json + +url = '${QUERY_URL}/api/v1/series?' + urllib.parse.urlencode({ + 'match[]': '{__name__=~\".+\"}' +}) +series = json.loads(urllib.request.urlopen(url).read())['data'] +issues = 0 +for s in series[:100]: + if '__name__' not in s or 'resourceId' not in s: + issues += 1 + if s.get('__name__', '') == '' or s.get('resourceId', '') == '': + issues += 1 +print(f'OK checked={min(len(series), 100)} issues={issues}') +" 2>&1) +if echo "$SERIES_INTEGRITY" | grep -q "issues=0"; then + pass "Series integrity: $SERIES_INTEGRITY" +else + fail "Series integrity issues: $SERIES_INTEGRITY" +fi + +# =================================================================== +# Section 12: Plugin Health & Error Rates +# =================================================================== + +echo "" +echo "=== Section 12: Plugin Health ===" + +echo "" +echo "--- Test 12.1: No excessive write errors ---" +if [ -n "$CONTAINER" ]; then + ERROR_COUNT=$($CONTAINER_CMD exec "$CONTAINER" sh -c 'c=$(grep -c "Error occurred while storing samples" /opt/opennms/logs/karaf.log 2>/dev/null || true); echo "${c:-0}"') + if [ "$ERROR_COUNT" -lt "50" ]; then + pass "Write error count low ($ERROR_COUNT errors)" + else + fail "High write error count: $ERROR_COUNT" + fi +else + skip "No container runtime available" +fi + +echo "" +echo "--- Test 12.2: No plugin exceptions/stack traces ---" +if [ -n "$CONTAINER" ]; then + EXCEPTION_COUNT=$($CONTAINER_CMD exec "$CONTAINER" sh -c 'c=$(grep -c "org.opennms.plugins.timeseries.cortex.*Exception" /opt/opennms/logs/karaf.log 2>/dev/null || true); echo "${c:-0}"') + if [ "$EXCEPTION_COUNT" -lt "5" ]; then + pass "Plugin exception count low ($EXCEPTION_COUNT)" + else + fail "High plugin exception count: $EXCEPTION_COUNT" + fi +else + skip "No container runtime available" +fi + +echo "" +echo "--- Test 12.3: No connection pool exhaustion ---" +if [ -n "$CONTAINER" ]; then + POOL_ERRORS=$($CONTAINER_CMD exec "$CONTAINER" sh -c 'c=$(grep -c "bulkhead\|connection pool\|max concurrent" /opt/opennms/logs/karaf.log 2>/dev/null || true); echo "${c:-0}"') + if [ "$POOL_ERRORS" -lt "5" ]; then + pass "No connection pool issues ($POOL_ERRORS occurrences)" + else + fail "Connection pool issues: $POOL_ERRORS" + fi +else + skip "No container runtime available" +fi + +echo "" +echo "--- Test 12.4: Karaf shell accessible and plugin registered ---" +if [ -n "$CONTAINER" ]; then + # Use SSH to access Karaf shell (default creds admin/admin, port 8101) + # Verify the cortex feature is installed and started + FEATURE_RESULT=$(sshpass -p admin ssh -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null -p 8101 admin@localhost "feature:list | grep cortex" 2>/dev/null || echo "SSH_FAILED") + if echo "$FEATURE_RESULT" | grep -qi "started\|installed"; then + pass "Cortex plugin feature registered in Karaf" + elif echo "$FEATURE_RESULT" | grep -q "SSH_FAILED"; then + # SSH not available from host - try checking via container logs instead + FEATURE_LOG=$($CONTAINER_CMD exec "$CONTAINER" sh -c 'c=$(grep -c "cortex-plugin.*started\|Starting.*cortex" /opt/opennms/logs/karaf.log 2>/dev/null || true); echo "${c:-0}"') + if [ "$FEATURE_LOG" -gt "0" ]; then + pass "Plugin feature started (verified via logs)" + else + fail "Cannot verify plugin feature status" + fi + else + fail "Cortex feature not found: $FEATURE_RESULT" + fi +else + skip "No container runtime available" +fi + +echo "" +echo "--- Test 12.5: Data is fresh (written in last 5 minutes) ---" +FRESHNESS=$(python3 -c " +import urllib.request, urllib.parse, json, time + +now = int(time.time()) +five_min_ago = now - 300 +url = '${QUERY_URL}/api/v1/query_range?' + urllib.parse.urlencode({ + 'query': '{__name__=~\".+\", resourceId=~\"snmp/.*\"}', + 'start': five_min_ago, + 'end': now, + 'step': '60' +}) +resp = json.loads(urllib.request.urlopen(url).read()) +series = resp.get('data', {}).get('result', []) +total_samples = sum(len(r.get('values', [])) for r in series) +print(f'OK series={len(series)} recent_samples={total_samples}') +" 2>&1) +if echo "$FRESHNESS" | grep -q "^OK" && ! echo "$FRESHNESS" | grep -q "recent_samples=0"; then + pass "Data is fresh: $FRESHNESS" +else + fail "Data may be stale: $FRESHNESS" +fi + +# =================================================================== +# Summary +# =================================================================== + +echo "" +echo "=======================================" +echo "$TESTS tests run, $((TESTS - FAILURES)) passed, $FAILURES failed" +echo "=======================================" +if [ "$FAILURES" -eq "0" ]; then + echo "ALL TESTS PASSED" + exit 0 +else + echo "$FAILURES TEST(S) FAILED" + exit 1 +fi diff --git a/plugin/src/main/java/org/opennms/timeseries/cortex/CortexTSS.java b/plugin/src/main/java/org/opennms/timeseries/cortex/CortexTSS.java index 3bdde2d..6af8ce4 100644 --- a/plugin/src/main/java/org/opennms/timeseries/cortex/CortexTSS.java +++ b/plugin/src/main/java/org/opennms/timeseries/cortex/CortexTSS.java @@ -34,6 +34,7 @@ import java.time.Instant; import java.util.Arrays; import java.util.Collection; +import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; import java.util.HashSet; @@ -62,6 +63,7 @@ import org.opennms.integration.api.v1.timeseries.TagMatcher; import org.opennms.integration.api.v1.timeseries.TimeSeriesFetchRequest; import org.opennms.integration.api.v1.timeseries.TimeSeriesStorage; +import org.opennms.integration.api.v1.timeseries.immutables.ImmutableTagMatcher; import org.opennms.integration.api.v1.timeseries.immutables.ImmutableTagMatcher.TagMatcherBuilder; import org.opennms.timeseries.cortex.shaded.resilience4j.bulkhead.Bulkhead; import org.opennms.timeseries.cortex.shaded.resilience4j.bulkhead.BulkheadConfig; @@ -425,11 +427,18 @@ public List findMetrics(Collection tagMatchers) throws Stora public List findMetrics(Collection tagMatchers, String clientID) throws StorageException { LOG.info("Retrieving metrics for tagMatchers: {}", tagMatchers); Objects.requireNonNull(tagMatchers); - Instant instant = Instant.now(); - long start = instant.getEpochSecond() - config.getMaxSeriesLookback(); // 90 days in seconds - if(tagMatchers.isEmpty()) { - throw new IllegalArgumentException("tagMatchers cannot be null"); + if (tagMatchers.isEmpty()) { + throw new IllegalArgumentException("tagMatchers cannot be empty"); + } + + // Use label values API for broad wildcard discovery queries (much cheaper on Thanos) + if (config.isUseLabelValuesForDiscovery() && isWildcardDiscoveryQuery(tagMatchers)) { + return findMetricsViaLabelValues(tagMatchers, clientID); } + + // Original /series path for targeted queries + Instant instant = Instant.now(); + long start = instant.getEpochSecond() - config.getMaxSeriesLookback(); String url = String.format("%s/series?match[]={%s}&start=%d", config.getReadUrl(), tagMatchersToQuery(tagMatchers), @@ -440,6 +449,63 @@ public List findMetrics(Collection tagMatchers, String clien return metrics; } + /** + * Uses the Prometheus label values API for initial resource discovery, + * then batches targeted /series queries for full metric details. + * + * This is dramatically cheaper on Thanos backends than a single /series + * query with a broad .* regex, because: + * 1. Label values queries are index-only (no chunk decompression) + * 2. Batched /series queries use exact-match alternation (direct index lookups) + */ + private List findMetricsViaLabelValues(Collection tagMatchers, String clientID) throws StorageException { + LOG.info("Using label values API for discovery query: {}", tagMatchers); + Instant now = Instant.now(); + long start = now.getEpochSecond() - config.getMaxSeriesLookback(); + long end = now.getEpochSecond(); + + // Phase 1: Get unique resourceId values via label values API + String matchParam = String.format("{%s}", tagMatchersToQuery(tagMatchers)); + String labelValuesUrl = String.format("%s/label/resourceId/values?match[]=%s&start=%d&end=%d", + config.getReadUrl(), + matchParam, + start, + end); + String labelValuesJson = makeCallToQueryApi(labelValuesUrl, clientID); + List resourceIds = ResultMapper.parseLabelValuesResponse(labelValuesJson); + + if (resourceIds.isEmpty()) { + return Collections.emptyList(); + } + + LOG.info("Label values discovery found {} unique resourceIds, batching /series queries", resourceIds.size()); + + // Phase 2: Batch /series queries using exact-match alternation + List allMetrics = new ArrayList<>(); + int batchSize = config.getDiscoveryBatchSize(); + for (int i = 0; i < resourceIds.size(); i += batchSize) { + List batch = resourceIds.subList(i, Math.min(i + batchSize, resourceIds.size())); + String batchRegex = buildBatchResourceIdRegex(batch); + + TagMatcher batchMatcher = ImmutableTagMatcher.builder() + .type(TagMatcher.Type.EQUALS_REGEX) + .key(IntrinsicTagNames.resourceId) + .value(batchRegex) + .build(); + + String seriesUrl = String.format("%s/series?match[]={%s}&start=%d&end=%d", + config.getReadUrl(), + tagMatchersToQuery(Collections.singletonList(batchMatcher)), + start, + end); + String seriesJson = makeCallToQueryApi(seriesUrl, clientID); + allMetrics.addAll(ResultMapper.fromSeriesQueryResult(seriesJson, kvStore)); + } + + allMetrics.forEach(m -> this.metricCache.put(m.getKey(), m)); + return allMetrics; + } + /** Returns the full metric (incl. meta data from the database). * This is only needed if not in cache already - which it should be. */ private Optional loadMetric(final Metric metric) throws StorageException { @@ -666,4 +732,35 @@ public MetricRegistry getMetrics() { public boolean supportsAggregation(Aggregation aggregation) { return SUPPORTED_AGGREGATION.contains(aggregation); } + + /** + * Detects whether a set of tag matchers represents a broad wildcard discovery query. + * These are generated by TimeseriesSearcher.getMetricsBelowWildcardPath() and match + * the pattern: resourceId =~ "^some/path/.*$" + * + * Intentionally narrow: only matches a single EQUALS_REGEX matcher on resourceId + * ending with "/.*$". Multi-matcher queries and other patterns fall back to the + * standard /series path, which is correct but slower on large Thanos deployments. + */ + static boolean isWildcardDiscoveryQuery(Collection tagMatchers) { + if (tagMatchers.size() != 1) { + return false; + } + TagMatcher matcher = tagMatchers.iterator().next(); + return TagMatcher.Type.EQUALS_REGEX.equals(matcher.getType()) + && IntrinsicTagNames.resourceId.equals(matcher.getKey()) + && matcher.getValue().endsWith("/.*$"); + } + + /** + * Builds a regex alternation pattern for exact-matching a batch of resourceId values. + * Escapes regex metacharacters in the values so Thanos/Prometheus can do direct + * index lookups instead of regex scanning. + */ + static String buildBatchResourceIdRegex(List resourceIds) { + String alternation = resourceIds.stream() + .map(id -> id.replaceAll("([\\\\{}()\\[\\].+*?^$|\\-])", "\\\\$1")) + .collect(Collectors.joining("|")); + return "^(" + alternation + ")$"; + } } diff --git a/plugin/src/main/java/org/opennms/timeseries/cortex/CortexTSSConfig.java b/plugin/src/main/java/org/opennms/timeseries/cortex/CortexTSSConfig.java index 8ad2bf8..14e3176 100644 --- a/plugin/src/main/java/org/opennms/timeseries/cortex/CortexTSSConfig.java +++ b/plugin/src/main/java/org/opennms/timeseries/cortex/CortexTSSConfig.java @@ -1,3 +1,31 @@ +/******************************************************************************* + * This file is part of OpenNMS(R). + * + * Copyright (C) 2021 The OpenNMS Group, Inc. + * OpenNMS(R) is Copyright (C) 1999-2021 The OpenNMS Group, Inc. + * + * OpenNMS(R) is a registered trademark of The OpenNMS Group, Inc. + * + * OpenNMS(R) is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published + * by the Free Software Foundation, either version 3 of the License, + * or (at your option) any later version. + * + * OpenNMS(R) is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with OpenNMS(R). If not, see: + * http://www.gnu.org/licenses/ + * + * For more information contact: + * OpenNMS(R) Licensing + * http://www.opennms.org/ + * http://www.opennms.com/ + *******************************************************************************/ + package org.opennms.timeseries.cortex; import java.util.Objects; @@ -15,6 +43,8 @@ public class CortexTSSConfig { private final long maxSeriesLookback; private final String organizationId; private final boolean hasOrganizationId; + private final boolean useLabelValuesForDiscovery; + private final int discoveryBatchSize; public CortexTSSConfig() { this(builder()); @@ -32,6 +62,8 @@ public CortexTSSConfig(Builder builder) { this.maxSeriesLookback = builder.maxSeriesLookback; this.organizationId = builder.organizationId; this.hasOrganizationId = organizationId != null && organizationId.trim().length() > 0; + this.useLabelValuesForDiscovery = builder.useLabelValuesForDiscovery; + this.discoveryBatchSize = builder.discoveryBatchSize; } /** Will be called via blueprint. The builder can be called when not running as Osgi plugin. */ @@ -45,7 +77,9 @@ public CortexTSSConfig( final long externalTagsCacheSize, final long bulkheadMaxWaitDurationInMs, final long maxSeriesLookback, - final String organizationId) { + final String organizationId, + final boolean useLabelValuesForDiscovery, + final int discoveryBatchSize) { this(builder() .writeUrl(writeUrl) .readUrl(readUrl) @@ -56,7 +90,9 @@ public CortexTSSConfig( .externalCacheSize(externalTagsCacheSize) .bulkheadMaxWaitDurationInMs(bulkheadMaxWaitDurationInMs) .maxSeriesLookback(maxSeriesLookback) - .organizationId(organizationId)); + .organizationId(organizationId) + .useLabelValuesForDiscovery(useLabelValuesForDiscovery) + .discoveryBatchSize(discoveryBatchSize)); } public String getWriteUrl() { @@ -101,6 +137,14 @@ public String getOrganizationId() { return organizationId; } + public boolean isUseLabelValuesForDiscovery() { + return useLabelValuesForDiscovery; + } + + public int getDiscoveryBatchSize() { + return discoveryBatchSize; + } + public static Builder builder() { return new Builder(); } @@ -116,6 +160,8 @@ public final static class Builder { private long bulkheadMaxWaitDurationInMs = Long.MAX_VALUE; private long maxSeriesLookback = 7776000; private String organizationId = null; + private boolean useLabelValuesForDiscovery = false; + private int discoveryBatchSize = 50; public Builder writeUrl(final String writeUrl) { this.writeUrl = writeUrl; @@ -166,6 +212,16 @@ public Builder organizationId(final String organizationId) { return this; } + public Builder useLabelValuesForDiscovery(final boolean useLabelValuesForDiscovery) { + this.useLabelValuesForDiscovery = useLabelValuesForDiscovery; + return this; + } + + public Builder discoveryBatchSize(final int discoveryBatchSize) { + this.discoveryBatchSize = Math.max(1, discoveryBatchSize); + return this; + } + public CortexTSSConfig build() { return new CortexTSSConfig(this); } @@ -184,6 +240,8 @@ public String toString() { .add("bulkheadMaxWaitDurationInMs=" + bulkheadMaxWaitDurationInMs) .add("maxSeriesLookback=" + maxSeriesLookback) .add("organizationId=" + organizationId) + .add("useLabelValuesForDiscovery=" + useLabelValuesForDiscovery) + .add("discoveryBatchSize=" + discoveryBatchSize) .toString(); } } diff --git a/plugin/src/main/java/org/opennms/timeseries/cortex/ResultMapper.java b/plugin/src/main/java/org/opennms/timeseries/cortex/ResultMapper.java index 626ccd2..0899b26 100644 --- a/plugin/src/main/java/org/opennms/timeseries/cortex/ResultMapper.java +++ b/plugin/src/main/java/org/opennms/timeseries/cortex/ResultMapper.java @@ -1,3 +1,31 @@ +/******************************************************************************* + * This file is part of OpenNMS(R). + * + * Copyright (C) 2021 The OpenNMS Group, Inc. + * OpenNMS(R) is Copyright (C) 1999-2021 The OpenNMS Group, Inc. + * + * OpenNMS(R) is a registered trademark of The OpenNMS Group, Inc. + * + * OpenNMS(R) is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published + * by the Free Software Foundation, either version 3 of the License, + * or (at your option) any later version. + * + * OpenNMS(R) is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with OpenNMS(R). If not, see: + * http://www.gnu.org/licenses/ + * + * For more information contact: + * OpenNMS(R) Licensing + * http://www.opennms.org/ + * http://www.opennms.com/ + *******************************************************************************/ + package org.opennms.timeseries.cortex; import java.io.IOException; @@ -111,6 +139,21 @@ static Metric appendExternalTagsToMetric(final Metric metric, final KeyValueStor } else return metric; } + /** + * Parses a Prometheus label values API response. + * Expected format: {"status":"success","data":["value1","value2",...]} + * See: https://prometheus.io/docs/prometheus/latest/querying/api/#querying-label-values + */ + public static List parseLabelValuesResponse(final String json) { + JSONObject response = new JSONObject(json); + JSONArray data = response.getJSONArray("data"); + List values = new ArrayList<>(data.length()); + for (int i = 0; i < data.length(); i++) { + values.add(data.getString(i)); + } + return values; + } + private static List parseMetrics(String json,KeyValueStore store) { try (JsonParser p = JSON_FACTORY.createParser(json)) { diff --git a/plugin/src/main/resources/OSGI-INF/blueprint/blueprint.xml b/plugin/src/main/resources/OSGI-INF/blueprint/blueprint.xml index 2368886..eb8ac2a 100644 --- a/plugin/src/main/resources/OSGI-INF/blueprint/blueprint.xml +++ b/plugin/src/main/resources/OSGI-INF/blueprint/blueprint.xml @@ -18,6 +18,8 @@ + + @@ -32,6 +34,8 @@ + + diff --git a/plugin/src/test/java/org/opennms/timeseries/cortex/CortexTSSTest.java b/plugin/src/test/java/org/opennms/timeseries/cortex/CortexTSSTest.java index bbb8025..05ec909 100644 --- a/plugin/src/test/java/org/opennms/timeseries/cortex/CortexTSSTest.java +++ b/plugin/src/test/java/org/opennms/timeseries/cortex/CortexTSSTest.java @@ -31,6 +31,8 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.core.IsEqual.equalTo; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import static org.opennms.timeseries.cortex.CortexTSS.LABEL_NAME_PATTERN; import static org.opennms.timeseries.cortex.CortexTSS.MAX_SAMPLES; import static org.opennms.timeseries.cortex.CortexTSS.METRIC_NAME_PATTERN; @@ -38,14 +40,17 @@ import java.time.Duration; import java.time.Instant; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import org.junit.Test; import org.opennms.integration.api.v1.timeseries.Aggregation; import org.opennms.integration.api.v1.timeseries.Tag; +import org.opennms.integration.api.v1.timeseries.TagMatcher; import org.opennms.integration.api.v1.timeseries.TimeSeriesFetchRequest; import org.opennms.integration.api.v1.timeseries.immutables.ImmutableMetric; import org.opennms.integration.api.v1.timeseries.immutables.ImmutableTag; +import org.opennms.integration.api.v1.timeseries.immutables.ImmutableTagMatcher; import org.opennms.integration.api.v1.timeseries.immutables.ImmutableTimeSeriesFetchRequest; public class CortexTSSTest { @@ -101,4 +106,81 @@ public void testTagsToQuery() { final String query = CortexTSS.tagsToQuery(tags); assertEquals("resourceId=\"response:Klatschmohnwiese:192.168.12.34:icmp\", resourceId=\"response:Klatschmohnwiese:2001\\\\:1234\\\\:1234\\\\:1234\\\\:1234\\\\:1234\\\\:1234\\\\:1234:icmp\"", query); } + + @Test + public void shouldDetectWildcardDiscoveryQuery() { + TagMatcher wildcard = ImmutableTagMatcher.builder() + .type(TagMatcher.Type.EQUALS_REGEX) + .key("resourceId") + .value("^snmp/1/.*$") + .build(); + assertTrue(CortexTSS.isWildcardDiscoveryQuery(Collections.singletonList(wildcard))); + } + + @Test + public void shouldNotDetectNonWildcardAsDiscovery() { + // Exact match + TagMatcher exact = ImmutableTagMatcher.builder() + .type(TagMatcher.Type.EQUALS) + .key("resourceId") + .value("snmp/1/eth0") + .build(); + assertFalse(CortexTSS.isWildcardDiscoveryQuery(Collections.singletonList(exact))); + + // Regex but not wildcard pattern + TagMatcher specificRegex = ImmutableTagMatcher.builder() + .type(TagMatcher.Type.EQUALS_REGEX) + .key("resourceId") + .value("^snmp/1/[^./]*$") + .build(); + assertFalse(CortexTSS.isWildcardDiscoveryQuery(Collections.singletonList(specificRegex))); + + // Regex on __name__, not resourceId + TagMatcher nameRegex = ImmutableTagMatcher.builder() + .type(TagMatcher.Type.EQUALS_REGEX) + .key("name") + .value("^something/.*$") + .build(); + assertFalse(CortexTSS.isWildcardDiscoveryQuery(Collections.singletonList(nameRegex))); + + // Multiple matchers (not a simple wildcard discovery) + assertFalse(CortexTSS.isWildcardDiscoveryQuery( + List.of(exact, specificRegex))); + } + + @Test + public void shouldRouteWildcardToLabelValuesWhenEnabled() { + CortexTSSConfig enabledConfig = CortexTSSConfig.builder() + .useLabelValuesForDiscovery(true) + .discoveryBatchSize(25) + .build(); + assertTrue(enabledConfig.isUseLabelValuesForDiscovery()); + assertEquals(25, enabledConfig.getDiscoveryBatchSize()); + + // Wildcard query should be detected + TagMatcher wildcard = ImmutableTagMatcher.builder() + .type(TagMatcher.Type.EQUALS_REGEX) + .key("resourceId") + .value("^snmp/fs/MySource/MyNode/.*$") + .build(); + assertTrue(CortexTSS.isWildcardDiscoveryQuery(Collections.singletonList(wildcard))); + } + + @Test + public void shouldFallBackWhenDisabled() { + CortexTSSConfig disabledConfig = CortexTSSConfig.builder() + .useLabelValuesForDiscovery(false) + .build(); + assertFalse(disabledConfig.isUseLabelValuesForDiscovery()); + } + + @Test + public void shouldBuildBatchRegex() { + List resourceIds = List.of( + "snmp/1/eth0/mib2-interfaces", + "snmp/1/eth1/mib2-interfaces", + "snmp/1/nodeSnmp"); + String regex = CortexTSS.buildBatchResourceIdRegex(resourceIds); + assertEquals("^(snmp/1/eth0/mib2\\-interfaces|snmp/1/eth1/mib2\\-interfaces|snmp/1/nodeSnmp)$", regex); + } } diff --git a/plugin/src/test/java/org/opennms/timeseries/cortex/KVStoreMock.java b/plugin/src/test/java/org/opennms/timeseries/cortex/KVStoreMock.java index e7f199d..580d9bc 100644 --- a/plugin/src/test/java/org/opennms/timeseries/cortex/KVStoreMock.java +++ b/plugin/src/test/java/org/opennms/timeseries/cortex/KVStoreMock.java @@ -1,3 +1,31 @@ +/******************************************************************************* + * This file is part of OpenNMS(R). + * + * Copyright (C) 2021 The OpenNMS Group, Inc. + * OpenNMS(R) is Copyright (C) 1999-2021 The OpenNMS Group, Inc. + * + * OpenNMS(R) is a registered trademark of The OpenNMS Group, Inc. + * + * OpenNMS(R) is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published + * by the Free Software Foundation, either version 3 of the License, + * or (at your option) any later version. + * + * OpenNMS(R) is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with OpenNMS(R). If not, see: + * http://www.gnu.org/licenses/ + * + * For more information contact: + * OpenNMS(R) Licensing + * http://www.opennms.org/ + * http://www.opennms.com/ + *******************************************************************************/ + package org.opennms.timeseries.cortex; import org.opennms.integration.api.v1.distributed.KeyValueStore; diff --git a/plugin/src/test/java/org/opennms/timeseries/cortex/PushSample.java b/plugin/src/test/java/org/opennms/timeseries/cortex/PushSample.java index 350fa22..b28d54c 100644 --- a/plugin/src/test/java/org/opennms/timeseries/cortex/PushSample.java +++ b/plugin/src/test/java/org/opennms/timeseries/cortex/PushSample.java @@ -1,3 +1,31 @@ +/******************************************************************************* + * This file is part of OpenNMS(R). + * + * Copyright (C) 2021 The OpenNMS Group, Inc. + * OpenNMS(R) is Copyright (C) 1999-2021 The OpenNMS Group, Inc. + * + * OpenNMS(R) is a registered trademark of The OpenNMS Group, Inc. + * + * OpenNMS(R) is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published + * by the Free Software Foundation, either version 3 of the License, + * or (at your option) any later version. + * + * OpenNMS(R) is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with OpenNMS(R). If not, see: + * http://www.gnu.org/licenses/ + * + * For more information contact: + * OpenNMS(R) Licensing + * http://www.opennms.org/ + * http://www.opennms.com/ + *******************************************************************************/ + package org.opennms.timeseries.cortex; import java.time.Instant; diff --git a/plugin/src/test/java/org/opennms/timeseries/cortex/ResultMapperTest.java b/plugin/src/test/java/org/opennms/timeseries/cortex/ResultMapperTest.java index 212c066..56ec5fc 100644 --- a/plugin/src/test/java/org/opennms/timeseries/cortex/ResultMapperTest.java +++ b/plugin/src/test/java/org/opennms/timeseries/cortex/ResultMapperTest.java @@ -1,3 +1,30 @@ +/******************************************************************************* + * This file is part of OpenNMS(R). + * + * Copyright (C) 2021 The OpenNMS Group, Inc. + * OpenNMS(R) is Copyright (C) 1999-2021 The OpenNMS Group, Inc. + * + * OpenNMS(R) is a registered trademark of The OpenNMS Group, Inc. + * + * OpenNMS(R) is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published + * by the Free Software Foundation, either version 3 of the License, + * or (at your option) any later version. + * + * OpenNMS(R) is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with OpenNMS(R). If not, see: + * http://www.gnu.org/licenses/ + * + * For more information contact: + * OpenNMS(R) Licensing + * http://www.opennms.org/ + * http://www.opennms.com/ + *******************************************************************************/ package org.opennms.timeseries.cortex; import static org.junit.Assert.assertEquals; @@ -82,6 +109,23 @@ public void testAppendExternalTagsToMetric() throws IOException, URISyntaxExcept .getValue()); } + @Test + public void shouldParseLabelValuesResult() throws IOException, URISyntaxException { + String json = readStringFromFile("labelValuesResult.json"); + List values = ResultMapper.parseLabelValuesResponse(json); + assertEquals(3, values.size()); + assertEquals("snmp/1/eth0/mib2-interfaces", values.get(0)); + assertEquals("snmp/1/eth1/mib2-interfaces", values.get(1)); + assertEquals("snmp/1/nodeSnmp", values.get(2)); + } + + @Test + public void shouldParseEmptyLabelValuesResult() { + String json = "{\"status\":\"success\",\"data\":[]}"; + List values = ResultMapper.parseLabelValuesResponse(json); + assertEquals(0, values.size()); + } + private String readStringFromFile(final String fileName) throws IOException, URISyntaxException { StringBuilder contentBuilder = new StringBuilder(); try (Stream stream = Files.lines( diff --git a/plugin/src/test/resources/org/opennms/timeseries/cortex/labelValuesResult.json b/plugin/src/test/resources/org/opennms/timeseries/cortex/labelValuesResult.json new file mode 100644 index 0000000..80dde5c --- /dev/null +++ b/plugin/src/test/resources/org/opennms/timeseries/cortex/labelValuesResult.json @@ -0,0 +1,8 @@ +{ + "status": "success", + "data": [ + "snmp/1/eth0/mib2-interfaces", + "snmp/1/eth1/mib2-interfaces", + "snmp/1/nodeSnmp" + ] +}