diff --git a/README.md b/README.md index 3f6fa483..cf402e3f 100644 --- a/README.md +++ b/README.md @@ -219,6 +219,11 @@ Environment variables: | `MG_AGENT_BOOTSTRAP_RETRIES` | Bootstrap fetch retries | `5` | | `MG_AGENT_BOOTSTRAP_RETRY_DELAY_SECONDS` | Bootstrap retry delay in seconds | `10` | | `MG_AGENT_BOOTSTRAP_SKIP_TLS` | Skip TLS verification for bootstrap fetch | `false` | +| `MG_AGENT_CLIENTS_URL` | Magistrala Clients API URL for device provisioning | | +| `MG_AGENT_CHANNELS_URL` | Magistrala Channels API URL for device provisioning | | +| `MG_AGENT_RULES_ENGINE_URL` | Magistrala Rules Engine URL (optional, for save_senml) | | +| `MG_PAT` | Magistrala Personal Access Token for provisioning | | +| `MG_AGENT_DEVICE_DB_PATH` | BoltDB file path for the device registry | `/var/lib/agent/devices.db` | ## MQTT Message Format @@ -239,6 +244,7 @@ The `n` field selects the subsystem. Supported subsystems: | `config` | View runtime config or save export service config | | `term` | Terminal session control | | `nodered` | Node-RED flow management | +| `devices` | Downstream device registry management | ## Sending Commands @@ -331,6 +337,143 @@ In both cases `flows` is the flow JSON **base64-encoded**. The agent automatical See [docs/nodered.md](docs/nodered.md) for the full setup guide, Docker Compose stack, and provisioning instructions. +## Device Manager + +The agent maintains a registry of downstream devices (Serial/Modbus, I2C, BLE, USB CDC) backed by a persistent BoltDB file. Each device is provisioned as a Magistrala Client and Channel pair, enabling it to publish telemetry independently through the gateway's MQTT connection. + +### Configuration + +| Variable | Description | Default | +| --------------------------- | ------------------------------------------------------ | --------------------------- | +| `MG_AGENT_CLIENTS_URL` | Magistrala Clients API base URL | | +| `MG_AGENT_CHANNELS_URL` | Magistrala Channels API base URL | | +| `MG_AGENT_RULES_ENGINE_URL` | Magistrala Rules Engine URL (creates `save_senml` rule)| | +| `MG_PAT` | Magistrala Personal Access Token | | +| `MG_AGENT_DEVICE_DB_PATH` | BoltDB path for the device registry | `/var/lib/agent/devices.db` | + +When `MG_PAT` is set via bootstrap config, it is stored in the config store and does not need to be set as an env var on subsequent restarts. + +### REST API + +#### Add a device + +Provisions a new Magistrala Client + Channel for the device and persists it to the registry. The returned `id` and `key` are the device's Magistrala credentials. + +```bash +curl -s -X POST http://localhost:9999/devices \ + -H 'Content-Type: application/json' \ + -d '{ + "name": "temperature-sensor-01", + "ext_id": "sensor-hw-id-001", + "ext_key": "sensor-hw-secret-001", + "interface_type": "serial", + "interface_addr": "/dev/ttyUSB0" + }' +``` + +Supported `interface_type` values: `serial`, `i2c`, `ble`, `usb`, `modbus_rtu`, `modbus_tcp`. + +Response (`201 Created`): + +```json +{ + "id": "", + "key": "", + "channel_id": "", + "name": "temperature-sensor-01", + "interface_type": "serial", + "interface_addr": "/dev/ttyUSB0", + "active": false, + "last_seen": "0001-01-01T00:00:00Z" +} +``` + +#### List devices + +```bash +curl -s http://localhost:9999/devices +``` + +Response (`200 OK`): + +```json +{ + "devices": [ + { + "id": "", + "key": "", + "channel_id": "", + "name": "temperature-sensor-01", + "interface_type": "serial", + "interface_addr": "/dev/ttyUSB0", + "active": true, + "last_seen": "2024-01-15T10:30:00Z" + } + ] +} +``` + +#### Get a device + +```bash +curl -s http://localhost:9999/devices/ +``` + +#### Remove a device + +```bash +curl -s -X DELETE http://localhost:9999/devices/ +``` + +Stops the device telemetry goroutine and removes the device from the local registry. The Magistrala Client and Channel are **not** deleted from Magistrala. + +#### Mark device seen + +Manually update the `last_seen` timestamp (useful for devices that communicate out-of-band): + +```bash +curl -s -X POST http://localhost:9999/devices//seen +``` + +### MQTT Commands (via Magistrala) + +The `devices` subsystem is invoked via `n: "devices"` on the commands channel. The `vs` field carries a subcommand, optionally followed by a comma-separated argument or a JSON payload. + +```bash +mosquitto_pub \ + -h -p 8883 --capath /etc/ssl/certs \ + -u -P --id "cmd-$(date +%s)" \ + -t "m//c//req" \ + -m '[{"bn":"req-1:", "n":"devices", "vs":"[,]"}]' +``` + +Supported subcommands: + +| Subcommand | `vs` format | Description | +| ---------- | ----------- | ----------- | +| `list` | `list` | Return JSON array of all devices | +| `get` | `get,` | Return JSON for one device | +| `add` | `add,{"name":"...","external_id":"...","external_key":"...","iface_type":"...","iface_addr":"..."}` | Provision and register a device | +| `remove` | `remove,` | Deregister a device | +| `seen` | `seen,` | Mark device last-seen | +| `open` | `open,` | Open the physical interface | +| `close` | `close,` | Close the physical interface | +| `read` | `read,,` | Read n bytes from interface (reply as hex) | +| `write` | `write,,` | Write hex bytes to interface | + +Responses are published to `m//c//res`. + +### Telemetry Scheduling + +Once registered, each device with a non-empty `channel_id` gets a dedicated telemetry goroutine. The goroutine: + +1. Opens a separate MQTT connection authenticated as the device (using its Magistrala `id`/`key`). +2. Opens the physical interface (`/dev/ttyUSB0`, I2C bus, etc.). +3. Reads bytes from the interface and publishes raw payloads to `m//c//msg`. +4. Marks the device as `active` and updates `last_seen` after each successful publish. + +The goroutine reconnects with exponential backoff (1 s → 30 s) after any failure. Goroutines are restored from the persistent registry on agent restart, so devices survive process restarts without re-provisioning. + ## Heartbeat Service Services running on the same host can publish to `heartbeat..` to register with the agent. diff --git a/ui/package.json b/ui/package.json index 1dfca588..67aea97e 100644 --- a/ui/package.json +++ b/ui/package.json @@ -5,7 +5,7 @@ "private": true, "scripts": { "dev": "vite", - "build": "tsc -b && vite build", + "build": "tsc -b && vite build && touch dist/.gitkeep dist/assets/.gitkeep", "preview": "vite preview", "lint": "biome check --error-on-warnings src", "lint:fix": "biome check --write src", diff --git a/ui/src/pages/devices.tsx b/ui/src/pages/devices.tsx index 47f51f1a..cdaa0961 100644 --- a/ui/src/pages/devices.tsx +++ b/ui/src/pages/devices.tsx @@ -2,18 +2,22 @@ // SPDX-License-Identifier: Apache-2.0 import { + ArrowUpDown, Bluetooth, Cable, + ChevronLeft, + ChevronRight, Copy, Cpu, Loader2, Plus, RefreshCw, + Search, Trash2, Usb, Wifi, } from "lucide-react"; -import { useEffect, useState } from "preact/hooks"; +import { useEffect, useMemo, useState } from "preact/hooks"; import { Button } from "@/components/ui/button"; import { Card, CardContent, CardHeader, CardTitle } from "@/components/ui/card"; import { Input } from "@/components/ui/input"; @@ -30,6 +34,10 @@ interface Device { channel_id: string; } +type SortKey = "active" | "name" | "last_seen"; + +const PAGE_SIZE = 20; + const IFACE_OPTIONS = [ { value: "ble", label: "BLE", addrPlaceholder: "AA:BB:CC:DD:EE:FF" }, { value: "serial", label: "Serial", addrPlaceholder: "/dev/ttyUSB0" }, @@ -88,11 +96,42 @@ async function extractError(res: Response): Promise { } } +function sortDevices(devices: Device[], key: SortKey, asc: boolean): Device[] { + return [...devices].sort((a, b) => { + let cmp = 0; + switch (key) { + case "active": + cmp = (b.active ? 1 : 0) - (a.active ? 1 : 0); + break; + case "name": + cmp = a.name.localeCompare(b.name); + break; + case "last_seen": { + const ta = + a.last_seen && !a.last_seen.startsWith("0001") + ? new Date(a.last_seen).getTime() + : 0; + const tb = + b.last_seen && !b.last_seen.startsWith("0001") + ? new Date(b.last_seen).getTime() + : 0; + cmp = tb - ta; + break; + } + } + return asc ? cmp : -cmp; + }); +} + export function DevicesPage() { const [devices, setDevices] = useState([]); const [loading, setLoading] = useState(false); const [error, setError] = useState(""); const [showAdd, setShowAdd] = useState(false); + const [query, setQuery] = useState(""); + const [sortKey, setSortKey] = useState("active"); + const [sortAsc, setSortAsc] = useState(true); + const [page, setPage] = useState(0); const [form, setForm] = useState({ name: "", @@ -112,6 +151,7 @@ export function DevicesPage() { if (!res.ok) throw new Error(await extractError(res)); const data = await res.json(); setDevices(data.devices ?? []); + setPage(0); } catch (e) { setError(String(e)); } finally { @@ -123,6 +163,39 @@ export function DevicesPage() { load(); }, []); + const filtered = useMemo(() => { + const q = query.trim().toLowerCase(); + const base = q + ? devices.filter( + (d) => + d.name.toLowerCase().includes(q) || + d.interface_addr.toLowerCase().includes(q) || + d.interface_type.toLowerCase().includes(q) || + d.id.toLowerCase().includes(q), + ) + : devices; + return sortDevices(base, sortKey, sortAsc); + }, [devices, query, sortKey, sortAsc]); + + const totalPages = Math.ceil(filtered.length / PAGE_SIZE); + const pageDevices = filtered.slice(page * PAGE_SIZE, (page + 1) * PAGE_SIZE); + const activeCount = devices.filter((d) => d.active).length; + + function cycleSort(key: SortKey) { + if (sortKey === key) { + setSortAsc((a) => !a); + } else { + setSortKey(key); + setSortAsc(true); + } + setPage(0); + } + + function handleQueryChange(e: Event) { + setQuery((e.target as HTMLInputElement).value); + setPage(0); + } + async function handleAdd(e: Event) { e.preventDefault(); setAdding(true); @@ -134,6 +207,8 @@ export function DevicesPage() { body: JSON.stringify(form), }); if (!res.ok) throw new Error(await extractError(res)); + const added: Device = await res.json(); + setDevices((prev) => [added, ...prev]); setShowAdd(false); setForm({ name: "", @@ -142,7 +217,6 @@ export function DevicesPage() { interface_type: "ble", interface_addr: "", }); - await load(); } catch (e) { setAddError(String(e)); } finally { @@ -161,7 +235,7 @@ export function DevicesPage() { try { const res = await fetch(`/devices/${id}`, { method: "DELETE" }); if (!res.ok) throw new Error(await extractError(res)); - setDevices((d) => d.filter((x) => x.id !== id)); + setDevices((prev) => prev.filter((x) => x.id !== id)); } catch (e) { setError(String(e)); } @@ -169,22 +243,37 @@ export function DevicesPage() { async function handleSeen(id: string) { try { - await fetch(`/devices/${id}/seen`, { method: "POST" }); - await load(); + const res = await fetch(`/devices/${id}/seen`, { method: "POST" }); + if (!res.ok) throw new Error(await extractError(res)); + // Update only the affected device in-place rather than re-fetching everything. + setDevices((prev) => + prev.map((d) => + d.id === id ? { ...d, last_seen: new Date().toISOString() } : d, + ), + ); } catch (e) { setError(String(e)); } } + const sortLabel: Record = { + active: "Status", + name: "Name", + last_seen: "Last seen", + }; + return (
+ {/* Header */}

Devices

- Downstream devices registered with this gateway. + {devices.length === 0 + ? "No devices registered." + : `${devices.length} device${devices.length !== 1 ? "s" : ""} · ${activeCount} active`}

@@ -203,6 +292,7 @@ export function DevicesPage() {
+ {/* Add form */} {showAdd && ( @@ -339,20 +429,63 @@ export function DevicesPage() {
)} + {/* Search + sort toolbar */} + {devices.length > 0 && ( +
+
+ + +
+
+ + Sort: +
+ {(["active", "name", "last_seen"] as SortKey[]).map((k) => ( + + ))} +
+ )} + + {/* Device list */}
- {devices.length === 0 ? ( + {filtered.length === 0 ? (
-

- No devices registered -

-

- Add a downstream device to get started. -

+ {devices.length === 0 ? ( + <> +

+ No devices registered +

+

+ Add a downstream device to get started. +

+ + ) : ( +

+ No devices match "{query}" +

+ )}
) : ( - devices.map((d) => ( + pageDevices.map((d) => (
+ + {/* Pagination */} + {totalPages > 1 && ( +
+ + {page * PAGE_SIZE + 1}– + {Math.min((page + 1) * PAGE_SIZE, filtered.length)} of{" "} + {filtered.length} + +
+ + + {page + 1} / {totalPages} + + +
+
+ )}
);