|
4 | 4 | "bytes"
|
5 | 5 | "context"
|
6 | 6 | "crypto/tls"
|
7 |
| - "encoding/base64" |
8 | 7 | "encoding/json"
|
9 | 8 | "fmt"
|
10 | 9 | "io"
|
@@ -104,20 +103,76 @@ func newFleetConfigManager(ctx context.Context, logger *slog.Logger, pMgr policy
|
104 | 103 | }
|
105 | 104 |
|
106 | 105 | func (fleetManager *fleetConfigManager) Start(cfg config.Config, backends map[string]backend.Backend) error {
|
| 106 | + ctx, cancel := context.WithCancel(context.Background()) |
| 107 | + defer cancel() |
| 108 | + |
107 | 109 | // call the token url to get the token
|
108 |
| - token, err := fleetManager.getToken(cfg.OrbAgent.ConfigManager.Sources.Fleet.TokenURL, cfg.OrbAgent.ConfigManager.Sources.Fleet.ClientID, cfg.OrbAgent.ConfigManager.Sources.Fleet.ClientSecret) |
| 110 | + token, err := fleetManager.getToken(ctx, cfg.OrbAgent.ConfigManager.Sources.Fleet.TokenURL, cfg.OrbAgent.ConfigManager.Sources.Fleet.ClientID, cfg.OrbAgent.ConfigManager.Sources.Fleet.ClientSecret) |
109 | 111 | if err != nil {
|
110 | 112 | return err
|
111 | 113 | }
|
112 | 114 |
|
113 |
| - // use the token to connect over MQTT v5 |
114 |
| - err = fleetManager.connect(token.MQTTURL, token.AccessToken, token.Topics, backends) |
| 115 | + // merge configuration values with token response values (config takes priority) |
| 116 | + mqttURL, topics := fleetManager.mergeConfigWithTokenResponse(cfg.OrbAgent.ConfigManager.Sources.Fleet, token) |
| 117 | + |
| 118 | + // use the merged configuration to connect over MQTT v5 |
| 119 | + err = fleetManager.connect(mqttURL, token.AccessToken, topics, backends) |
115 | 120 | if err != nil {
|
116 | 121 | return err
|
117 | 122 | }
|
118 | 123 | return nil
|
119 | 124 | }
|
120 | 125 |
|
| 126 | +// mergeConfigWithTokenResponse merges configuration values with token response values, |
| 127 | +// giving priority to token response values when they are provided |
| 128 | +func (fleetManager *fleetConfigManager) mergeConfigWithTokenResponse(fleetCfg config.FleetManager, token *tokenResponse) (string, tokenResponseTopics) { |
| 129 | + // Start with configuration values as defaults |
| 130 | + mqttURL := fleetCfg.MQTTURL |
| 131 | + topics := tokenResponseTopics{ |
| 132 | + Heartbeat: fleetCfg.HeartbeatTopic, |
| 133 | + Capabilities: fleetCfg.CapabilitiesTopic, |
| 134 | + } |
| 135 | + |
| 136 | + // Handle legacy TopicName field for backward compatibility - only if specific topics aren't set |
| 137 | + if fleetCfg.TopicName != "" && fleetCfg.HeartbeatTopic == "" && fleetCfg.CapabilitiesTopic == "" { |
| 138 | + fleetManager.logger.Debug("using legacy topic name as base for heartbeat and capabilities", "topic", fleetCfg.TopicName) |
| 139 | + topics.Heartbeat = fleetCfg.TopicName + "/heartbeat" |
| 140 | + topics.Capabilities = fleetCfg.TopicName + "/capabilities" |
| 141 | + } |
| 142 | + |
| 143 | + // Override with token response values if provided (token takes priority) |
| 144 | + if token.MQTTURL != "" { |
| 145 | + fleetManager.logger.Debug("using MQTT URL from token response", "token_url", token.MQTTURL, "config_url", fleetCfg.MQTTURL) |
| 146 | + mqttURL = token.MQTTURL |
| 147 | + } else if mqttURL != "" { |
| 148 | + fleetManager.logger.Debug("using MQTT URL from configuration", "config_url", mqttURL) |
| 149 | + } |
| 150 | + |
| 151 | + // Token response topics override configuration topics |
| 152 | + if token.Topics.Heartbeat != "" { |
| 153 | + fleetManager.logger.Debug("using heartbeat topic from token response", "token_topic", token.Topics.Heartbeat, "config_topic", topics.Heartbeat) |
| 154 | + topics.Heartbeat = token.Topics.Heartbeat |
| 155 | + } |
| 156 | + |
| 157 | + if token.Topics.Capabilities != "" { |
| 158 | + fleetManager.logger.Debug("using capabilities topic from token response", "token_topic", token.Topics.Capabilities, "config_topic", topics.Capabilities) |
| 159 | + topics.Capabilities = token.Topics.Capabilities |
| 160 | + } |
| 161 | + |
| 162 | + // Token response always provides inbox/outbox topics |
| 163 | + topics.Inbox = token.Topics.Inbox |
| 164 | + topics.Outbox = token.Topics.Outbox |
| 165 | + |
| 166 | + fleetManager.logger.Info("merged configuration and token response", |
| 167 | + "mqtt_url", mqttURL, |
| 168 | + "heartbeat_topic", topics.Heartbeat, |
| 169 | + "capabilities_topic", topics.Capabilities, |
| 170 | + "inbox_topic", topics.Inbox, |
| 171 | + "outbox_topic", topics.Outbox) |
| 172 | + |
| 173 | + return mqttURL, topics |
| 174 | +} |
| 175 | + |
121 | 176 | func (fleetManager *fleetConfigManager) connectWithContext(ctx context.Context, fleetMQTTURL, token string, topics tokenResponseTopics, backends map[string]backend.Backend) error {
|
122 | 177 | // Parse the ORB URL
|
123 | 178 | serverURL, err := url.Parse(fleetMQTTURL)
|
@@ -314,64 +369,94 @@ type tokenResponse struct {
|
314 | 369 | ExpiresIn int `json:"expires_in"`
|
315 | 370 | }
|
316 | 371 |
|
317 |
| -// getTokenWithContext is the internal implementation that obeys the supplied |
318 |
| -// context for cancellation. It was introduced so that production code can be |
319 |
| -// context-aware while preserving the original test helper signature. |
320 |
| -func (fleetManager *fleetConfigManager) getTokenWithContext(ctx context.Context, tokenURL string, clientID string, clientSecret string) (*tokenResponse, error) { |
| 372 | +func (fleetManager *fleetConfigManager) getToken(ctx context.Context, tokenURL string, clientID string, clientSecret string) (*tokenResponse, error) { |
| 373 | + // Input validation |
| 374 | + if tokenURL == "" { |
| 375 | + return nil, fmt.Errorf("token URL cannot be empty") |
| 376 | + } |
| 377 | + if clientID == "" { |
| 378 | + return nil, fmt.Errorf("client ID cannot be empty") |
| 379 | + } |
| 380 | + if clientSecret == "" { |
| 381 | + return nil, fmt.Errorf("client secret cannot be empty") |
| 382 | + } |
| 383 | + |
| 384 | + fleetManager.logger.Debug("requesting access token", "token_url", tokenURL, "client_id", clientID) |
| 385 | + |
321 | 386 | scopes := []string{
|
322 |
| - "rabbitmq.read:*/*", |
323 |
| - "rabbitmq.write:*/*", |
324 |
| - "rabbitmq.configure:*/*", |
| 387 | + "orb.read:*/*/*", |
| 388 | + "orb.write:*/*/*", |
| 389 | + "orb.configure:*/*/*", |
325 | 390 | }
|
326 | 391 |
|
327 | 392 | data := url.Values{}
|
328 | 393 | data.Set("grant_type", "client_credentials")
|
329 | 394 | data.Set("scope", strings.Join(scopes, " "))
|
| 395 | + data.Set("client_id", clientID) |
| 396 | + data.Set("client_secret", clientSecret) |
330 | 397 |
|
331 |
| - // Encode credentials in Basic Auth header |
332 |
| - creds := base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf("%s:%s", clientID, clientSecret))) |
| 398 | + fleetManager.logger.Debug("sending token request", "url", tokenURL, "data", data, "client_id", clientID) //, "client_secret", clientSecret) |
333 | 399 |
|
334 | 400 | req, err := http.NewRequest("POST", tokenURL, bytes.NewBufferString(data.Encode()))
|
335 | 401 | if err != nil {
|
| 402 | + fleetManager.logger.Error("failed to create token request", "error", err, "token_url", tokenURL) |
336 | 403 | return nil, fmt.Errorf("failed to create request: %w", err)
|
337 | 404 | }
|
338 | 405 | req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
|
339 |
| - req.Header.Set("Authorization", "Basic "+creds) |
340 | 406 |
|
341 |
| - // HTTP client with TLS verification disabled |
| 407 | + // HTTP client with configurable timeout and TLS settings |
342 | 408 | httpClient := &http.Client{
|
| 409 | + Timeout: 30 * time.Second, // TODO: make configurable |
343 | 410 | Transport: &http.Transport{
|
344 |
| - TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, // TODO: make configurable? |
| 411 | + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, // TODO: make configurable |
345 | 412 | },
|
346 | 413 | }
|
347 | 414 |
|
| 415 | + fleetManager.logger.Debug("sending token request", "url", tokenURL) |
348 | 416 | resp, err := httpClient.Do(req.WithContext(ctx))
|
349 | 417 | if err != nil {
|
350 |
| - return nil, fmt.Errorf("failed to send request: %w", err) |
| 418 | + fleetManager.logger.Error("failed to send token request", "error", err, "token_url", tokenURL) |
| 419 | + return nil, fmt.Errorf("failed to send request to %s: %w", tokenURL, err) |
351 | 420 | }
|
352 | 421 | defer func() {
|
353 | 422 | if err := resp.Body.Close(); err != nil {
|
354 | 423 | fleetManager.logger.Error("failed to close response body", "error", err)
|
355 | 424 | }
|
356 | 425 | }()
|
357 | 426 |
|
358 |
| - body, _ := io.ReadAll(resp.Body) |
| 427 | + body, err := io.ReadAll(resp.Body) |
| 428 | + if err != nil { |
| 429 | + fleetManager.logger.Error("failed to read response body", "error", err, "status_code", resp.StatusCode) |
| 430 | + return nil, fmt.Errorf("failed to read response body: %w", err) |
| 431 | + } |
| 432 | + |
359 | 433 | if resp.StatusCode != 200 {
|
360 |
| - return nil, fmt.Errorf("token request failed: %s", body) |
| 434 | + fleetManager.logger.Error("token request failed", |
| 435 | + "status_code", resp.StatusCode, |
| 436 | + "response", string(body), |
| 437 | + "token_url", tokenURL, |
| 438 | + "client_id", clientID) |
| 439 | + return nil, fmt.Errorf("token request failed with status %d: %s", resp.StatusCode, string(body)) |
361 | 440 | }
|
362 | 441 |
|
363 | 442 | var tokenResponse tokenResponse
|
364 | 443 | if err := json.Unmarshal(body, &tokenResponse); err != nil {
|
| 444 | + fleetManager.logger.Error("failed to parse token response", "error", err, "response", string(body)) |
365 | 445 | return nil, fmt.Errorf("failed to parse token response: %w", err)
|
366 | 446 | }
|
367 | 447 |
|
368 |
| - return &tokenResponse, nil |
369 |
| -} |
| 448 | + // Validate token response |
| 449 | + if tokenResponse.AccessToken == "" { |
| 450 | + fleetManager.logger.Error("received empty access token", "response", string(body)) |
| 451 | + return nil, fmt.Errorf("received empty access token from server") |
| 452 | + } |
| 453 | + |
| 454 | + fleetManager.logger.Info("successfully obtained access token", |
| 455 | + "token_url", tokenURL, |
| 456 | + "expires_in", tokenResponse.ExpiresIn, |
| 457 | + "mqtt_url", tokenResponse.MQTTURL) |
370 | 458 |
|
371 |
| -// getToken is kept for backward-compatibility with existing tests. It delegates |
372 |
| -// to getTokenWithContext using the fleet manager’s root context. |
373 |
| -func (fleetManager *fleetConfigManager) getToken(tokenURL string, clientID string, clientSecret string) (*tokenResponse, error) { |
374 |
| - return fleetManager.getTokenWithContext(fleetManager.heartbeater.heartbeatCtx, tokenURL, clientID, clientSecret) |
| 459 | + return &tokenResponse, nil |
375 | 460 | }
|
376 | 461 |
|
377 | 462 | func (fleetManager *fleetConfigManager) GetContext(ctx context.Context) context.Context {
|
|
0 commit comments