Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 3 additions & 7 deletions pkg/providers/netq/netq.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
const (
LoginURL = "auth/v1/login"
OpIdURL = "auth/v1/select/opid"
TopologyURL = "telemetry/v1/object/topologygraph/fetch-topology"
TopologyURL = "api/netq/telemetry/v1/object/topologygraph/fetch-topology"
)

type NetqResponse struct {
Expand All @@ -49,7 +49,7 @@ type AuthOutput struct {
AccessToken string `json:"access_token"`
}

func (p *Provider) generateTopologyConfig(ctx context.Context, cis []topology.ComputeInstances) (*topology.Vertex, *httperr.Error) {
func (p *Provider) getNetworkTree(ctx context.Context, cis []topology.ComputeInstances) (*topology.Vertex, *httperr.Error) {
// 1. login to NetQ server
payload := strings.NewReader(fmt.Sprintf(`{"username":%q, "password":%q}`, p.cred.user, p.cred.passwd))
headers := map[string]string{
Expand Down Expand Up @@ -242,9 +242,5 @@ func parseNetq(resp []NetqResponse, inputNodes map[string]bool) (*topology.Verte
treeRoot.Vertices[node.ID] = node
}

root := &topology.Vertex{
Vertices: map[string]*topology.Vertex{topology.TopologyTree: treeRoot},
}

return root, nil
return treeRoot, nil
}
4 changes: 2 additions & 2 deletions pkg/providers/netq/netq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,11 @@ func TestParseNetq(t *testing.T) {
Nodes: nodes,
}}

root, err := parseNetq(netqResponse, map[string]bool{"A": true})
treeRoot, err := parseNetq(netqResponse, map[string]bool{"A": true})
require.Nil(t, err)

top := []*topology.Vertex{}
for _, v := range root.Vertices[topology.TopologyTree].Vertices {
for _, v := range treeRoot.Vertices {
top = append(top, v)
}

Expand Down
71 changes: 71 additions & 0 deletions pkg/providers/netq/nmx.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright 2025 NVIDIA CORPORATION
* SPDX-License-Identifier: Apache-2.0
*/

package netq

import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
"net/http"

"k8s.io/klog/v2"

"github.com/NVIDIA/topograph/internal/httperr"
"github.com/NVIDIA/topograph/internal/httpreq"
"github.com/NVIDIA/topograph/pkg/topology"
)

const (
ComputeURL = "nmx/v1/compute-nodes"
)

type ComputeNode struct {
Id string `json:"ID"`
Name string `json:"Name"`
DomainUUID string `json:"DomainUUID"`
}

func (p *Provider) getNvlDomains(ctx context.Context) (topology.DomainMap, *httperr.Error) {
url, headers, httpErr := p.getComputeUrl()
if httpErr != nil {
return nil, httpErr
}

klog.V(4).Infof("Fetching %s", url)
f := getRequestFunc(ctx, "GET", url, headers, nil)
_, data, err := httpreq.DoRequest(f, true)
if err != nil {
return nil, err
}

return parseComputeNodes(data)
}

func (p *Provider) getComputeUrl() (string, map[string]string, *httperr.Error) {
auth := p.cred.user + ":" + p.cred.passwd
authHeader := "Basic " + base64.StdEncoding.EncodeToString([]byte(auth))
headers := map[string]string{"Authorization": authHeader}

url, err := httpreq.GetURL(p.params.ApiURL, nil, ComputeURL)
return url, headers, err
}

func parseComputeNodes(data []byte) (topology.DomainMap, *httperr.Error) {
var computeNodes []ComputeNode
err := json.Unmarshal(data, &computeNodes)
if err != nil {
return nil, httperr.NewError(http.StatusBadGateway, fmt.Sprintf("nmx output read failed: %v", err))
}

domainMap := topology.NewDomainMap()
for _, node := range computeNodes {
klog.V(4).Infof("Add NVL domain %q for node %q", node.DomainUUID, node.Name)
domainMap.AddHost(node.DomainUUID, node.Name, node.Name)
}

return domainMap, nil
}
70 changes: 70 additions & 0 deletions pkg/providers/netq/nmx_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright 2025 NVIDIA CORPORATION
* SPDX-License-Identifier: Apache-2.0
*/

package netq

import (
"os"
"testing"

"github.com/NVIDIA/topograph/pkg/topology"
"github.com/stretchr/testify/require"
)

func TestGetComputeUrl(t *testing.T) {
p := &Provider{
params: &ProviderParams{},
cred: &Credentials{user: "user", passwd: "passwd"},
}

testCases := []struct {
name string
serverURL string
headers map[string]string
computeUrl string
err string
}{
{
name: "Case 1: invalid URL",
serverURL: `:///server`,
err: `parse ":///server": missing protocol scheme`,
},
{
name: "Case 2: valid input",
serverURL: `https://server.com`,
headers: map[string]string{"Authorization": "Basic dXNlcjpwYXNzd2Q="},
computeUrl: `https://server.com/nmx/v1/compute-nodes`,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
p.params.ApiURL = tc.serverURL
url, headers, err := p.getComputeUrl()
if len(tc.err) != 0 {
require.NotNil(t, err)
require.EqualError(t, err, tc.err)
} else {
require.Nil(t, err)
require.Equal(t, tc.computeUrl, url)
require.Equal(t, tc.headers, headers)
}
})
}
}

func TestParseComputeNodes(t *testing.T) {
data, err := os.ReadFile("../../../tests/output/netq/computeNodes.json")
require.NoError(t, err)

domains, err := parseComputeNodes(data)
require.Nil(t, err)

expected := topology.NewDomainMap()
expected.AddHost("3fbfc98b-7f95-4749-ab11-bd351a2aab3e", "node1", "node1")
expected.AddHost("3fbfc98b-7f95-4749-ab11-bd351a2aab3e", "node2", "node2")

require.Equal(t, expected, domains)
}
24 changes: 23 additions & 1 deletion pkg/providers/netq/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"fmt"
"net/http"

"k8s.io/klog/v2"

"github.com/NVIDIA/topograph/internal/config"
"github.com/NVIDIA/topograph/internal/httperr"
"github.com/NVIDIA/topograph/pkg/providers"
Expand Down Expand Up @@ -85,7 +87,27 @@ func getParams(params map[string]any) (*ProviderParams, error) {
}

func (p *Provider) GenerateTopologyConfig(ctx context.Context, _ *int, instances []topology.ComputeInstances) (*topology.Vertex, *httperr.Error) {
return p.generateTopologyConfig(ctx, instances)
domains, err := p.getNvlDomains(ctx)
if err != nil {
klog.Warningf("Failed to get NVL domains: %v", err)
}

treeRoot, err := p.getNetworkTree(ctx, instances)
if err != nil {
return nil, err
}

root := &topology.Vertex{
Vertices: map[string]*topology.Vertex{
topology.TopologyTree: treeRoot,
},
}

if domains != nil {
root.Vertices[topology.TopologyBlock] = domains.ToBlocks()
}

return root, nil
}

// Instances2NodeMap implements slurm.instanceMapper
Expand Down
46 changes: 46 additions & 0 deletions tests/output/netq/computeNodes.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
[
{
"CreatedAt": "2025-11-04T16:45:16.482Z",
"Description": "Test",
"DomainUUID": "3fbfc98b-7f95-4749-ab11-bd351a2aab3e",
"GpuIDList": [
"690a2d9cc5e905fa41a87cdc",
"690a2d9cc5e905fa41a87cdd",
"690a2d9cc5e905fa41a87cde",
"690a2d9cc5e905fa41a87cdf"
],
"Health": "HEALTHY",
"ID": "690a2d9cc5e905fa41a87d79",
"LocationInfo": {
"ChassisID": 1,
"ChassisSerialNumber": "1825024170029",
"HostID": 1,
"SlotID": 5,
"TrayIndex": 4
},
"Name": "node1",
"UpdatedAt": "2025-11-05T18:37:47.155Z"
},
{
"CreatedAt": "2025-11-04T16:45:16.482Z",
"Description": "",
"DomainUUID": "3fbfc98b-7f95-4749-ab11-bd351a2aab3e",
"GpuIDList": [
"690a2d9cc5e905fa41a87ce0",
"690a2d9cc5e905fa41a87ce2",
"690a2d9cc5e905fa41a87ce3",
"690a2d9cc5e905fa41a87ce1"
],
"Health": "HEALTHY",
"ID": "690a2d9cc5e905fa41a87d7a",
"LocationInfo": {
"ChassisID": 1,
"ChassisSerialNumber": "1825024170029",
"HostID": 1,
"SlotID": 22,
"TrayIndex": 12
},
"Name": "node2",
"UpdatedAt": "2025-11-05T18:37:47.155Z"
}
]