diff --git a/internal/kibana/client.go b/internal/kibana/client.go index 0f5ddaaf4d..6b9522c499 100644 --- a/internal/kibana/client.go +++ b/internal/kibana/client.go @@ -52,11 +52,11 @@ func NewClient(opts ...ClientOption) (*Client, error) { // Allow to initialize version from tests. var zeroVersion VersionInfo if c.semver == nil || c.versionInfo == zeroVersion { - v, err := c.requestVersion() + v, err := c.requestStatus() if err != nil { return nil, fmt.Errorf("failed to get Kibana version: %w", err) } - c.versionInfo = v + c.versionInfo = v.Version c.semver, err = semver.NewVersion(c.versionInfo.Number) if err != nil { diff --git a/internal/kibana/fleet.go b/internal/kibana/fleet.go new file mode 100644 index 0000000000..850fb7fc98 --- /dev/null +++ b/internal/kibana/fleet.go @@ -0,0 +1,45 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package kibana + +import ( + "encoding/json" + "errors" + "fmt" + "net/http" +) + +// DefaultFleetServerURL returns the default Fleet server configured in Kibana +func (c *Client) DefaultFleetServerURL() (string, error) { + path := fmt.Sprintf("%s/fleet_server_hosts", FleetAPI) + + statusCode, respBody, err := c.get(path) + if err != nil { + return "", fmt.Errorf("could not reach fleet server hosts endpoint: %w", err) + } + + if statusCode != http.StatusOK { + return "", fmt.Errorf("could not get status data; API status code = %d; response body = %s", statusCode, respBody) + } + + var hosts struct { + Items []struct { + IsDefault bool `json:"is_default"` + HostURLs []string `json:"host_urls"` + } `json:"items"` + } + err = json.Unmarshal(respBody, &hosts) + if err != nil { + return "", fmt.Errorf("failed to decode response: %w", err) + } + + for _, server := range hosts.Items { + if server.IsDefault && len(server.HostURLs) > 0 { + return server.HostURLs[0], nil + } + } + + return "", errors.New("could not find the fleet server URL for this project") +} diff --git a/internal/kibana/status.go b/internal/kibana/status.go index 6f91ddec06..adfef541a5 100644 --- a/internal/kibana/status.go +++ b/internal/kibana/status.go @@ -30,6 +30,11 @@ func (v VersionInfo) IsSnapshot() bool { type statusType struct { Version VersionInfo `json:"version"` + Status struct { + Overall struct { + Level string `json:"level"` + } `json:"overall"` + } `json:"status"` } // Version method returns the version of Kibana (Elastic stack) @@ -37,22 +42,34 @@ func (c *Client) Version() (VersionInfo, error) { return c.versionInfo, nil } -func (c *Client) requestVersion() (VersionInfo, error) { - var version VersionInfo +func (c *Client) requestStatus() (statusType, error) { + var status statusType statusCode, respBody, err := c.get(StatusAPI) if err != nil { - return version, fmt.Errorf("could not reach status endpoint: %w", err) + return status, fmt.Errorf("could not reach status endpoint: %w", err) } if statusCode != http.StatusOK { - return version, fmt.Errorf("could not get status data; API status code = %d; response body = %s", statusCode, respBody) + return status, fmt.Errorf("could not get status data; API status code = %d; response body = %s", statusCode, respBody) } - var status statusType err = json.Unmarshal(respBody, &status) if err != nil { - return version, fmt.Errorf("unmarshalling response failed (body: \n%s): %w", respBody, err) + return status, fmt.Errorf("unmarshalling response failed (body: \n%s): %w", respBody, err) + } + + return status, nil +} + +// CheckHealth checks the Kibana health +func (c *Client) CheckHealth() error { + status, err := c.requestStatus() + if err != nil { + return fmt.Errorf("could not reach status endpoint: %w", err) } - return status.Version, nil + if status.Status.Overall.Level != "available" { + return fmt.Errorf("kibana in unhealthy state: %s", status.Status.Overall.Level) + } + return nil } diff --git a/internal/profile/_static/config.yml.example b/internal/profile/_static/config.yml.example index 9146275871..e0f700f692 100644 --- a/internal/profile/_static/config.yml.example +++ b/internal/profile/_static/config.yml.example @@ -1,2 +1,12 @@ # Directory containing GeoIP databases for stacks managed by elastic-agent. # stack.geoip_dir: "/path/to/geoip_dir/" +## Elastic Cloud +# Host URL +# stack.elastic_cloud.host: https://cloud.elastic.co + +## Serverless stack provider +# Project type +# stack.serverless.type: observability +# Region where the Serverless project is going to be created +# stack.serverless.region: aws-us-east-1 + diff --git a/internal/serverless/client.go b/internal/serverless/client.go new file mode 100644 index 0000000000..51ce89cbf7 --- /dev/null +++ b/internal/serverless/client.go @@ -0,0 +1,332 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package serverless + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "io" + "net/http" + "net/url" + "os" + "time" + + "github.com/elastic/elastic-package/internal/logger" +) + +const ( + defaultHostURL = "https://cloud.elastic.co" + + projectsAPI = "/api/v1/serverless/projects" +) + +type Client struct { + host string + apiKey string +} + +// ClientOption is functional option modifying Serverless API client. +type ClientOption func(*Client) + +var ( + elasticCloudApiKeyEnv = "EC_API_KEY" + elasticCloudEndpointEnv = "EC_HOST" + + ErrProjectNotExist = errors.New("project does not exist") +) + +func NewClient(opts ...ClientOption) (*Client, error) { + apiKey := os.Getenv(elasticCloudApiKeyEnv) + if apiKey == "" { + return nil, fmt.Errorf("unable to obtain value from %s environment variable", elasticCloudApiKeyEnv) + } + c := &Client{ + host: defaultHostURL, + apiKey: apiKey, + } + for _, opt := range opts { + opt(c) + } + + host := os.Getenv(elasticCloudEndpointEnv) + if host != "" { + c.host = host + } + logger.Debugf("Using Elastic Cloud URL: %s", c.host) + return c, nil +} + +// WithAddress option sets the host to use to connect to Kibana. +func WithAddress(address string) ClientOption { + return func(c *Client) { + c.host = address + } +} + +// WithApiKey option sets the host to use to connect to Kibana. +func WithApiKey(apiKey string) ClientOption { + return func(c *Client) { + c.apiKey = apiKey + } +} + +func (c *Client) get(ctx context.Context, resourcePath string) (int, []byte, error) { + return c.sendRequest(ctx, http.MethodGet, resourcePath, nil) +} + +func (c *Client) post(ctx context.Context, resourcePath string, body []byte) (int, []byte, error) { + return c.sendRequest(ctx, http.MethodPost, resourcePath, body) +} + +func (c *Client) delete(ctx context.Context, resourcePath string) (int, []byte, error) { + return c.sendRequest(ctx, http.MethodDelete, resourcePath, nil) +} + +func (c *Client) sendRequest(ctx context.Context, method, resourcePath string, body []byte) (int, []byte, error) { + request, err := c.newRequest(ctx, method, resourcePath, bytes.NewReader(body)) + if err != nil { + return 0, nil, err + } + + return c.doRequest(request) +} + +func (c *Client) newRequest(ctx context.Context, method, resourcePath string, reqBody io.Reader) (*http.Request, error) { + base, err := url.Parse(c.host) + if err != nil { + return nil, fmt.Errorf("could not create base URL from host: %v: %w", c.host, err) + } + + rel, err := url.Parse(resourcePath) + if err != nil { + return nil, fmt.Errorf("could not create relative URL from resource path: %v: %w", resourcePath, err) + } + + u := base.JoinPath(rel.EscapedPath()) + u.RawQuery = rel.RawQuery + + logger.Debugf("%s %s", method, u) + + req, err := http.NewRequestWithContext(ctx, method, u.String(), reqBody) + if err != nil { + return nil, fmt.Errorf("could not create %v request to Kibana API resource: %s: %w", method, resourcePath, err) + } + + req.Header.Add("content-type", "application/json") + req.Header.Add("Authorization", fmt.Sprintf("ApiKey %s", c.apiKey)) + + return req, nil +} + +func (c *Client) doRequest(request *http.Request) (int, []byte, error) { + client := http.Client{} + + resp, err := client.Do(request) + if err != nil { + return 0, nil, fmt.Errorf("could not send request to Kibana API: %w", err) + } + + defer resp.Body.Close() + body, err := io.ReadAll(resp.Body) + if err != nil { + return resp.StatusCode, nil, fmt.Errorf("could not read response body: %w", err) + } + + return resp.StatusCode, body, nil +} + +func (c *Client) CreateProject(name, region, projectType string) (*Project, error) { + ReqBody := struct { + Name string `json:"name"` + RegionID string `json:"region_id"` + }{ + Name: name, + RegionID: region, + } + p, err := json.Marshal(ReqBody) + if err != nil { + return nil, err + } + ctx := context.TODO() + resourcePath, err := url.JoinPath(c.host, projectsAPI, projectType) + if err != nil { + return nil, fmt.Errorf("could not build the URL: %w", err) + } + statusCode, respBody, err := c.post(ctx, resourcePath, p) + + if err != nil { + return nil, fmt.Errorf("error creating project: %w", err) + } + + if statusCode != http.StatusCreated { + return nil, fmt.Errorf("unexpected status code %d, body: %s", statusCode, string(respBody)) + } + + serverlessProject := &Project{url: c.host, apiKey: c.apiKey} + err = json.Unmarshal(respBody, &serverlessProject) + if err != nil { + return nil, fmt.Errorf("error while decoding create project response: %w", err) + } + + err = c.ResetCredentials(ctx, serverlessProject) + if err != nil { + return nil, fmt.Errorf("failed to reset credentials: %w", err) + } + + return serverlessProject, nil +} + +func (c *Client) EnsureProjectInitialized(ctx context.Context, project *Project) error { + timer := time.NewTimer(time.Millisecond) + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-timer.C: + } + + status, err := c.StatusProject(ctx, project) + if err != nil { + logger.Debugf("error querying for status: %s", err.Error()) + timer.Reset(time.Second * 5) + continue + } + + if status != "initialized" { + logger.Debugf("project not initialized, status: %s", status) + timer.Reset(time.Second * 5) + continue + } + + return nil + } +} + +func (c *Client) StatusProject(ctx context.Context, project *Project) (string, error) { + resourcePath, err := url.JoinPath(c.host, projectsAPI, project.Type, project.ID, "status") + if err != nil { + return "", fmt.Errorf("could not build the URL: %w", err) + } + statusCode, respBody, err := c.get(ctx, resourcePath) + + if err != nil { + return "", fmt.Errorf("error getting status project: %w", err) + } + + if statusCode != http.StatusOK { + return "", fmt.Errorf("unexpected status code %d", statusCode) + } + + var status struct { + Phase string `json:"phase"` + } + + if err := json.Unmarshal(respBody, &status); err != nil { + return "", fmt.Errorf("unable to decode status: %w", err) + } + + return status.Phase, nil +} + +func (c *Client) ResetCredentials(ctx context.Context, project *Project) error { + resourcePath, err := url.JoinPath(c.host, projectsAPI, project.Type, project.ID, "_reset-credentials") + if err != nil { + return fmt.Errorf("could not build the URL: %w", err) + } + statusCode, respBody, err := c.post(ctx, resourcePath, nil) + + if err != nil { + return fmt.Errorf("error creating project: %w", err) + } + + if statusCode != http.StatusOK { + return fmt.Errorf("unexpected status code %d", statusCode) + } + + var credentials struct { + Username string `json:"username"` + Password string `json:"password"` + } + if err := json.Unmarshal(respBody, &credentials); err != nil { + return fmt.Errorf("unable to decode credentials: %w", err) + } + + project.Credentials.Username = credentials.Username + project.Credentials.Password = credentials.Password + + return nil +} + +func (c *Client) DeleteProject(project *Project) error { + ctx := context.TODO() + resourcePath, err := url.JoinPath(c.host, projectsAPI, project.Type, project.ID) + if err != nil { + return fmt.Errorf("could not build the URL: %w", err) + } + statusCode, _, err := c.delete(ctx, resourcePath) + if err != nil { + return fmt.Errorf("error deleting project: %w", err) + } + + if statusCode != http.StatusOK { + return fmt.Errorf("unexpected status code %d", statusCode) + } + + return nil +} + +func (c *Client) GetProject(projectType, projectID string) (*Project, error) { + ctx := context.TODO() + resourcePath, err := url.JoinPath(c.host, projectsAPI, projectType, projectID) + if err != nil { + return nil, fmt.Errorf("could not build the URL: %w", err) + } + statusCode, respBody, err := c.get(ctx, resourcePath) + if err != nil { + return nil, fmt.Errorf("error getting project: %w", err) + } + + if statusCode == http.StatusNotFound { + return nil, ErrProjectNotExist + } + + if statusCode != http.StatusOK { + return nil, fmt.Errorf("unexpected status code %d", statusCode) + } + + project := &Project{url: c.host, apiKey: c.apiKey} + err = json.Unmarshal(respBody, &project) + if err != nil { + return nil, fmt.Errorf("failed to decode project: %w", err) + } + + return project, nil +} + +func (c *Client) EnsureEndpoints(ctx context.Context, project *Project) error { + if project.Endpoints.Elasticsearch != "" { + return nil + } + + for { + newProject, err := c.GetProject(project.Type, project.ID) + switch { + case err != nil: + logger.Debugf("request error: %s", err.Error()) + case newProject.Endpoints.Elasticsearch != "": + project.Endpoints = newProject.Endpoints + return nil + } + logger.Debugf("Waiting for Elasticsearch endpoint for %s project %q", project.Type, project.ID) + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(time.Second * 5): + } + } +} diff --git a/internal/serverless/project.go b/internal/serverless/project.go new file mode 100644 index 0000000000..32a4487551 --- /dev/null +++ b/internal/serverless/project.go @@ -0,0 +1,217 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package serverless + +import ( + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "net/url" + "time" + + "github.com/elastic/elastic-package/internal/elasticsearch" + "github.com/elastic/elastic-package/internal/kibana" + "github.com/elastic/elastic-package/internal/logger" + "github.com/elastic/elastic-package/internal/registry" +) + +// Project represents a serverless project +type Project struct { + url string + apiKey string + + Name string `json:"name"` + ID string `json:"id"` + Alias string `json:"alias"` + Type string `json:"type"` + Region string `json:"region_id"` + + Credentials struct { + Username string `json:"username"` + Password string `json:"password"` + } `json:"credentials"` + + Endpoints struct { + Elasticsearch string `json:"elasticsearch"` + Kibana string `json:"kibana"` + Fleet string `json:"fleet,omitempty"` + APM string `json:"apm,omitempty"` + } `json:"endpoints"` +} + +func (p *Project) EnsureHealthy(ctx context.Context, elasticsearchClient *elasticsearch.Client, kibanaClient *kibana.Client) error { + if err := p.ensureElasticsearchHealthy(ctx, elasticsearchClient); err != nil { + return fmt.Errorf("elasticsearch not healthy: %w", err) + } + if err := p.ensureKibanaHealthy(ctx, kibanaClient); err != nil { + return fmt.Errorf("kibana not healthy: %w", err) + } + if err := p.ensureFleetHealthy(ctx); err != nil { + return fmt.Errorf("fleet not healthy: %w", err) + } + return nil +} + +func (p *Project) Status(ctx context.Context, elasticsearchClient *elasticsearch.Client, kibanaClient *kibana.Client) (map[string]string, error) { + var status map[string]string + healthStatus := func(err error) string { + if err != nil { + return fmt.Sprintf("unhealthy: %s", err.Error()) + } + return "healthy" + } + + status = map[string]string{ + "elasticsearch": healthStatus(p.getESHealth(ctx, elasticsearchClient)), + "kibana": healthStatus(p.getKibanaHealth(kibanaClient)), + "fleet": healthStatus(p.getFleetHealth(ctx)), + } + return status, nil +} + +func (p *Project) ensureElasticsearchHealthy(ctx context.Context, elasticsearchClient *elasticsearch.Client) error { + for { + err := elasticsearchClient.CheckHealth(ctx) + if err == nil { + return nil + } + + logger.Debugf("Elasticsearch service not ready: %s", err.Error()) + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(5 * time.Second): + } + } +} + +func (p *Project) ensureKibanaHealthy(ctx context.Context, kibanaClient *kibana.Client) error { + for { + err := kibanaClient.CheckHealth() + if err == nil { + return nil + } + + logger.Debugf("Kibana service not ready: %s", err.Error()) + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(5 * time.Second): + } + } +} + +func (p *Project) ensureFleetHealthy(ctx context.Context) error { + for { + err := p.getFleetHealth(ctx) + if err == nil { + return nil + } + + logger.Debugf("Fleet service not ready: %s", err.Error()) + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(5 * time.Second): + } + } +} + +func (p *Project) DefaultFleetServerURL(kibanaClient *kibana.Client) (string, error) { + fleetURL, err := kibanaClient.DefaultFleetServerURL() + if err != nil { + return "", fmt.Errorf("failed to query fleet server hosts: %w", err) + } + + return fleetURL, nil +} + +func (p *Project) getESHealth(ctx context.Context, elasticsearchClient *elasticsearch.Client) error { + return elasticsearchClient.CheckHealth(ctx) +} + +func (p *Project) getKibanaHealth(kibanaClient *kibana.Client) error { + return kibanaClient.CheckHealth() +} + +func (p *Project) getFleetHealth(ctx context.Context) error { + statusURL, err := url.JoinPath(p.Endpoints.Fleet, "/api/status") + if err != nil { + return fmt.Errorf("could not build URL: %w", err) + } + logger.Debugf("GET %s", statusURL) + req, err := http.NewRequestWithContext(ctx, "GET", statusURL, nil) + if err != nil { + return err + } + resp, err := http.DefaultClient.Do(req) + if err != nil { + return fmt.Errorf("request failed (url: %s): %w", statusURL, err) + } + defer resp.Body.Close() + if resp.StatusCode >= 300 { + return fmt.Errorf("unexpected status code %v", resp.StatusCode) + } + body, err := io.ReadAll(resp.Body) + if err != nil { + return fmt.Errorf("failed to read response body: %w", err) + } + var status struct { + Name string `json:"name"` + Status string `json:"status"` + } + err = json.Unmarshal(body, &status) + if err != nil { + return fmt.Errorf("failed to parse response body: %w", err) + } + + if status.Status != "HEALTHY" { + return fmt.Errorf("fleet status %s", status.Status) + + } + return nil +} + +func (p *Project) CreateAgentPolicy(stackVersion string, kibanaClient *kibana.Client) error { + systemPackages, err := registry.Production.Revisions("system", registry.SearchOptions{ + KibanaVersion: stackVersion, + }) + if err != nil { + return fmt.Errorf("could not get the system package version for kibana %v: %w", stackVersion, err) + } + if len(systemPackages) != 1 { + return fmt.Errorf("unexpected number of system package versions - found %d expected 1", len(systemPackages)) + } + logger.Debugf("Found %s package - version %s", systemPackages[0].Name, systemPackages[0].Version) + + policy := kibana.Policy{ + ID: "elastic-agent-managed-ep", + Name: "Elastic-Agent (elastic-package)", + Description: "Policy created by elastic-package", + Namespace: "default", + MonitoringEnabled: []string{"logs", "metrics"}, + } + newPolicy, err := kibanaClient.CreatePolicy(policy) + if err != nil { + return fmt.Errorf("error while creating agent policy: %w", err) + } + + packagePolicy := kibana.PackagePolicy{ + Name: "system-1", + PolicyID: newPolicy.ID, + Namespace: newPolicy.Namespace, + } + packagePolicy.Package.Name = "system" + packagePolicy.Package.Version = systemPackages[0].Version + + _, err = kibanaClient.CreatePackagePolicy(packagePolicy) + if err != nil { + return fmt.Errorf("error while creating package policy: %w", err) + } + + return nil +} diff --git a/internal/stack/_static/docker-compose-stack.yml.tmpl b/internal/stack/_static/docker-compose-stack.yml.tmpl index 141448eee8..d1033c19cc 100644 --- a/internal/stack/_static/docker-compose-stack.yml.tmpl +++ b/internal/stack/_static/docker-compose-stack.yml.tmpl @@ -102,11 +102,11 @@ services: - "FLEET_SERVER_ENABLE=1" - "FLEET_SERVER_HOST=0.0.0.0" - "FLEET_SERVER_SERVICE_TOKEN=AAEAAWVsYXN0aWMvZmxlZXQtc2VydmVyL2VsYXN0aWMtcGFja2FnZS1mbGVldC1zZXJ2ZXItdG9rZW46bmgtcFhoQzRRQ2FXbms2U0JySGlWQQ" - - "FLEET_URL=https://fleet-server:8220" - - "KIBANA_FLEET_HOST=https://kibana:5601" + - "FLEET_URL={{ fact "fleet_url" }}" + - "KIBANA_FLEET_HOST={{ fact "kibana_host" }}" - "KIBANA_FLEET_SERVICE_TOKEN=AAEAAWVsYXN0aWMvZmxlZXQtc2VydmVyL2VsYXN0aWMtcGFja2FnZS1mbGVldC1zZXJ2ZXItdG9rZW46bmgtcFhoQzRRQ2FXbms2U0JySGlWQQ" - "KIBANA_FLEET_SETUP=1" - - "KIBANA_HOST=https://kibana:5601" + - "KIBANA_HOST={{ fact "kibana_host" }}" volumes: - "../certs/ca-cert.pem:/etc/ssl/certs/elastic-package.pem" - "../certs/fleet-server:/etc/ssl/elastic-agent" diff --git a/internal/stack/_static/elastic-agent.env.tmpl b/internal/stack/_static/elastic-agent.env.tmpl index 70a0b5cb58..d8888122e1 100644 --- a/internal/stack/_static/elastic-agent.env.tmpl +++ b/internal/stack/_static/elastic-agent.env.tmpl @@ -1,8 +1,10 @@ {{ $version := fact "agent_version" }} FLEET_ENROLL=1 -FLEET_URL=https://fleet-server:8220 -KIBANA_FLEET_HOST=https://kibana:5601 -KIBANA_HOST=https://kibana:5601 +FLEET_URL={{ fact "fleet_url" }} +KIBANA_FLEET_HOST={{ fact "kibana_host" }} +KIBANA_HOST={{ fact "kibana_host" }} +ELASTICSEARCH_USERNAME={{ fact "username" }} +ELASTICSEARCH_PASSWORD={{ fact "password" }} {{ if not (semverLessThan $version "8.0.0") }} FLEET_TOKEN_POLICY_NAME=Elastic-Agent (elastic-package) {{ end }} diff --git a/internal/stack/_static/kibana.yml.tmpl b/internal/stack/_static/kibana.yml.tmpl index 9e52be0d8a..80b908c0f9 100644 --- a/internal/stack/_static/kibana.yml.tmpl +++ b/internal/stack/_static/kibana.yml.tmpl @@ -26,7 +26,7 @@ xpack.fleet.agents.elasticsearch.hosts: ["https://elasticsearch:9200"] xpack.fleet.registryUrl: "https://package-registry:8080" xpack.fleet.agents.enabled: true -xpack.fleet.agents.fleet_server.hosts: ["https://fleet-server:8220"] +xpack.fleet.agents.fleet_server.hosts: ["{{ fact "fleet_url" }}"] {{ if and (not (semverLessThan $version "8.7.0")) (semverLessThan $version "8.10.0-SNAPSHOT") }} xpack.fleet.enableExperimental: ["experimentalDataStreamSettings"] # Enable experimental toggles in Fleet UI diff --git a/internal/stack/_static/serverless-elastic-agent.yml.tmpl b/internal/stack/_static/serverless-elastic-agent.yml.tmpl new file mode 100644 index 0000000000..94744bb2b7 --- /dev/null +++ b/internal/stack/_static/serverless-elastic-agent.yml.tmpl @@ -0,0 +1,26 @@ +version: '2.3' +services: + elastic-agent: + image: "{{ fact "agent_image" }}" + healthcheck: + test: "elastic-agent status" + timeout: 2s + start_period: 360s + retries: 180 + interval: 5s + hostname: docker-fleet-agent + env_file: "./elastic-agent.env" + volumes: + - type: bind + source: ../../../tmp/service_logs/ + target: /tmp/service_logs/ + # Mount service_logs under /run too as a testing workaround for the journald input (see elastic-package#1235). + - type: bind + source: ../../../tmp/service_logs/ + target: /run/service_logs/ + + elastic-agent_is_ready: + image: tianon/true + depends_on: + elastic-agent: + condition: service_healthy diff --git a/internal/stack/dump.go b/internal/stack/dump.go index 1e94c1f561..cb53615740 100644 --- a/internal/stack/dump.go +++ b/internal/stack/dump.go @@ -18,8 +18,6 @@ const ( fleetServerService = "fleet-server" ) -var observedServices = []string{"elasticsearch", elasticAgentService, fleetServerService, "kibana", "package-registry"} - // DumpOptions defines dumping options for Elatic stack data. type DumpOptions struct { Output string @@ -50,7 +48,12 @@ func dumpStackLogs(options DumpOptions) error { return fmt.Errorf("can't create output location (path: %s): %w", logsPath, err) } - for _, serviceName := range observedServices { + services, err := localServiceNames(DockerComposeProjectName(options.Profile)) + if err != nil { + return fmt.Errorf("failed to get local services: %w", err) + } + + for _, serviceName := range services { logger.Debugf("Dump stack logs for %s", serviceName) content, err := dockerComposeLogs(serviceName, options.Profile) diff --git a/internal/stack/providers.go b/internal/stack/providers.go index e782424b35..730ad7cedb 100644 --- a/internal/stack/providers.go +++ b/internal/stack/providers.go @@ -12,13 +12,15 @@ import ( ) const ( - ProviderCompose = "compose" + ProviderCompose = "compose" + ProviderServerless = "serverless" ) var ( DefaultProvider = ProviderCompose SupportedProviders = []string{ ProviderCompose, + ProviderServerless, } ) @@ -50,8 +52,10 @@ type Provider interface { // BuildProvider returns the provider for the given name. func BuildProvider(name string, profile *profile.Profile) (Provider, error) { switch name { - case "compose": + case ProviderCompose: return &composeProvider{}, nil + case ProviderServerless: + return newServerlessProvider(profile) } return nil, fmt.Errorf("unknown provider %q, supported providers: %s", name, strings.Join(SupportedProviders, ", ")) } diff --git a/internal/stack/resources.go b/internal/stack/resources.go index 9a235ac142..05701c0587 100644 --- a/internal/stack/resources.go +++ b/internal/stack/resources.go @@ -116,6 +116,9 @@ func applyResources(profile *profile.Profile, stackVersion string) error { "kibana_version": stackVersion, "agent_version": stackVersion, + "kibana_host": "https://kibana:5601", + "fleet_url": "https://fleet-server:8220", + "username": elasticsearchUsername, "password": elasticsearchPassword, diff --git a/internal/stack/serverless.go b/internal/stack/serverless.go new file mode 100644 index 0000000000..565ceeebb3 --- /dev/null +++ b/internal/stack/serverless.go @@ -0,0 +1,475 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package stack + +import ( + "context" + "errors" + "fmt" + "strings" + "time" + + "github.com/elastic/elastic-package/internal/common" + "github.com/elastic/elastic-package/internal/compose" + "github.com/elastic/elastic-package/internal/docker" + "github.com/elastic/elastic-package/internal/elasticsearch" + "github.com/elastic/elastic-package/internal/kibana" + "github.com/elastic/elastic-package/internal/logger" + "github.com/elastic/elastic-package/internal/profile" + "github.com/elastic/elastic-package/internal/serverless" +) + +const ( + paramServerlessProjectID = "serverless_project_id" + paramServerlessProjectType = "serverless_project_type" + paramServerlessFleetURL = "serverless_fleet_url" + + configRegion = "stack.serverless.region" + configProjectType = "stack.serverless.type" + configElasticCloudURL = "stack.elastic_cloud.host" + + defaultRegion = "aws-us-east-1" + defaultProjectType = "observability" +) + +var ( + allowedProjectTypes = []string{ + "security", + "observability", + } +) + +type serverlessProvider struct { + profile *profile.Profile + client *serverless.Client + + elasticsearchClient *elasticsearch.Client + kibanaClient *kibana.Client +} + +type projectSettings struct { + Name string + Region string + Type string + + StackVersion string +} + +func (sp *serverlessProvider) createProject(settings projectSettings, options Options, conf Config) (Config, error) { + project, err := sp.client.CreateProject(settings.Name, settings.Region, settings.Type) + if err != nil { + return Config{}, fmt.Errorf("failed to create %s project %s in %s: %w", settings.Type, settings.Name, settings.Region, err) + } + + ctx, cancel := context.WithTimeout(context.TODO(), time.Minute*30) + defer cancel() + if err := sp.client.EnsureEndpoints(ctx, project); err != nil { + return Config{}, fmt.Errorf("failed to ensure endpoints have been provisioned properly: %w", err) + } + + var config Config + config.Provider = ProviderServerless + config.Parameters = map[string]string{ + paramServerlessProjectID: project.ID, + paramServerlessProjectType: project.Type, + } + + config.ElasticsearchHost = project.Endpoints.Elasticsearch + config.KibanaHost = project.Endpoints.Kibana + config.ElasticsearchUsername = project.Credentials.Username + config.ElasticsearchPassword = project.Credentials.Password + + // Store config now in case fails initialization or other requests, + // so it can be destroyed later + err = storeConfig(sp.profile, config) + if err != nil { + return Config{}, fmt.Errorf("failed to store config: %w", err) + } + + logger.Debug("Waiting for creation plan to be completed") + err = sp.client.EnsureProjectInitialized(ctx, project) + if err != nil { + return Config{}, fmt.Errorf("project not initialized: %w", err) + } + + err = sp.createClients(project) + if err != nil { + return Config{}, err + } + + config.Parameters[paramServerlessFleetURL], err = project.DefaultFleetServerURL(sp.kibanaClient) + if err != nil { + return Config{}, fmt.Errorf("failed to get fleet URL: %w", err) + } + project.Endpoints.Fleet = config.Parameters[paramServerlessFleetURL] + + printUserConfig(options.Printer, config) + + // update config with latest updates (e.g. fleet server url) + err = storeConfig(sp.profile, config) + if err != nil { + return Config{}, fmt.Errorf("failed to store config: %w", err) + } + + err = project.EnsureHealthy(ctx, sp.elasticsearchClient, sp.kibanaClient) + if err != nil { + return Config{}, fmt.Errorf("not all services are healthy: %w", err) + } + + return config, nil +} + +func (sp *serverlessProvider) deleteProject(project *serverless.Project, options Options) error { + return sp.client.DeleteProject(project) +} + +func (sp *serverlessProvider) currentProject(config Config) (*serverless.Project, error) { + projectID, found := config.Parameters[paramServerlessProjectID] + if !found { + return nil, serverless.ErrProjectNotExist + } + + projectType, found := config.Parameters[paramServerlessProjectType] + if !found { + return nil, serverless.ErrProjectNotExist + } + + project, err := sp.client.GetProject(projectType, projectID) + if errors.Is(serverless.ErrProjectNotExist, err) { + return nil, err + } + if err != nil { + return nil, fmt.Errorf("couldn't check project health: %w", err) + } + + project.Credentials.Username = config.ElasticsearchUsername + project.Credentials.Password = config.ElasticsearchPassword + + err = sp.createClients(project) + if err != nil { + return nil, err + } + + fleetURL := config.Parameters[paramServerlessFleetURL] + if true { + fleetURL, err = project.DefaultFleetServerURL(sp.kibanaClient) + if err != nil { + return nil, fmt.Errorf("failed to get fleet URL: %w", err) + } + } + project.Endpoints.Fleet = fleetURL + + return project, nil +} + +func (sp *serverlessProvider) createClients(project *serverless.Project) error { + var err error + sp.elasticsearchClient, err = NewElasticsearchClient( + elasticsearch.OptionWithAddress(project.Endpoints.Elasticsearch), + elasticsearch.OptionWithUsername(project.Credentials.Username), + elasticsearch.OptionWithPassword(project.Credentials.Password), + ) + if err != nil { + return fmt.Errorf("failed to create elasticsearch client: %w", err) + } + + sp.kibanaClient, err = NewKibanaClient( + kibana.Address(project.Endpoints.Kibana), + kibana.Username(project.Credentials.Username), + kibana.Password(project.Credentials.Password), + ) + if err != nil { + return fmt.Errorf("failed to create kibana client: %w", err) + } + + return nil +} + +func getProjectSettings(options Options) (projectSettings, error) { + s := projectSettings{ + Name: createProjectName(options), + Type: options.Profile.Config(configProjectType, defaultProjectType), + Region: options.Profile.Config(configRegion, defaultRegion), + StackVersion: options.StackVersion, + } + + return s, nil +} + +func createProjectName(options Options) string { + return fmt.Sprintf("elastic-package-test-%s", options.Profile.ProfileName) +} + +func newServerlessProvider(profile *profile.Profile) (*serverlessProvider, error) { + host := profile.Config(configElasticCloudURL, "") + options := []serverless.ClientOption{} + if host != "" { + options = append(options, serverless.WithAddress(host)) + } + client, err := serverless.NewClient(options...) + if err != nil { + return nil, fmt.Errorf("can't create serverless provider: %w", err) + } + + return &serverlessProvider{profile, client, nil, nil}, nil +} + +func (sp *serverlessProvider) BootUp(options Options) error { + logger.Warn("Elastic Serverless provider is in technical preview") + + config, err := LoadConfig(sp.profile) + if err != nil { + return fmt.Errorf("failed to load configuration: %w", err) + } + + settings, err := getProjectSettings(options) + if err != nil { + return err + } + + if !common.StringSliceContains(allowedProjectTypes, settings.Type) { + return fmt.Errorf("serverless project type not supported: %s", settings.Type) + } + + var project *serverless.Project + + project, err = sp.currentProject(config) + switch err { + default: + return err + case serverless.ErrProjectNotExist: + logger.Infof("Creating %s project: %q", settings.Type, settings.Name) + config, err = sp.createProject(settings, options, config) + if err != nil { + return fmt.Errorf("failed to create deployment: %w", err) + } + + project, err = sp.currentProject(config) + if err != nil { + return fmt.Errorf("failed to retrieve latest project created: %w", err) + } + + err = sp.createClients(project) + if err != nil { + return err + } + + logger.Infof("Creating agent policy") + err = project.CreateAgentPolicy(options.StackVersion, sp.kibanaClient) + if err != nil { + return fmt.Errorf("failed to create agent policy: %w", err) + } + + // TODO: Ensuring a specific GeoIP database would make tests reproducible + // Currently geo ip files would be ignored when running pipeline tests + case nil: + logger.Debugf("%s project existed: %s", project.Type, project.Name) + printUserConfig(options.Printer, config) + } + + logger.Infof("Starting local agent") + err = sp.startLocalAgent(options, config) + if err != nil { + return fmt.Errorf("failed to start local agent: %w", err) + } + + return nil +} + +func (sp *serverlessProvider) composeProjectName() string { + return DockerComposeProjectName(sp.profile) +} + +func (sp *serverlessProvider) localAgentComposeProject() (*compose.Project, error) { + composeFile := sp.profile.Path(profileStackPath, SnapshotFile) + return compose.NewProject(sp.composeProjectName(), composeFile) +} + +func (sp *serverlessProvider) startLocalAgent(options Options, config Config) error { + err := applyServerlessResources(sp.profile, options.StackVersion, config) + if err != nil { + return fmt.Errorf("could not initialize compose files for local agent: %w", err) + } + + project, err := sp.localAgentComposeProject() + if err != nil { + return fmt.Errorf("could not initialize local agent compose project") + } + + err = project.Build(compose.CommandOptions{}) + if err != nil { + return fmt.Errorf("failed to build images for local agent: %w", err) + } + + err = project.Up(compose.CommandOptions{ExtraArgs: []string{"-d"}}) + if err != nil { + // At least starting on 8.6.0, fleet-server may be reconfigured or + // restarted after being healthy. If elastic-agent tries to enroll at + // this moment, it fails inmediately, stopping and making `docker-compose up` + // to fail too. + // As a workaround, try to give another chance to docker-compose if only + // elastic-agent failed. + fmt.Println("Elastic Agent failed to start, trying again.") + err = project.Up(compose.CommandOptions{ExtraArgs: []string{"-d"}}) + if err != nil { + return fmt.Errorf("failed to start local agent: %w", err) + } + } + + return nil +} + +func (sp *serverlessProvider) TearDown(options Options) error { + config, err := LoadConfig(sp.profile) + if err != nil { + return fmt.Errorf("failed to load configuration: %w", err) + } + + err = sp.destroyLocalAgent() + if err != nil { + return fmt.Errorf("failed to destroy local agent: %w", err) + } + + project, err := sp.currentProject(config) + if err != nil { + return fmt.Errorf("failed to find current project: %w", err) + } + + logger.Debugf("Deleting project %q (%s)", project.Name, project.ID) + + err = sp.deleteProject(project, options) + if err != nil { + return fmt.Errorf("failed to delete project: %w", err) + } + + // TODO: if GeoIP database is specified, remove the geoip Bundle (if needed) + return nil +} + +func (sp *serverlessProvider) destroyLocalAgent() error { + project, err := sp.localAgentComposeProject() + if err != nil { + return fmt.Errorf("could not initialize local agent compose project") + } + + err = project.Down(compose.CommandOptions{}) + if err != nil { + return fmt.Errorf("failed to destroy local agent: %w", err) + } + + return nil +} + +func (sp *serverlessProvider) Update(options Options) error { + return fmt.Errorf("not implemented") +} + +func (sp *serverlessProvider) Dump(options DumpOptions) (string, error) { + return Dump(options) +} + +func (sp *serverlessProvider) Status(options Options) ([]ServiceStatus, error) { + logger.Warn("Elastic Serverless provider is in technical preview") + config, err := LoadConfig(sp.profile) + if err != nil { + return nil, fmt.Errorf("failed to load configuration: %w", err) + } + + project, err := sp.currentProject(config) + if errors.Is(serverless.ErrProjectNotExist, err) { + return nil, nil + } + if err != nil { + return nil, err + } + + ctx := context.TODO() + projectServiceStatus, err := project.Status(ctx, sp.elasticsearchClient, sp.kibanaClient) + if err != nil { + return nil, err + } + + serverlessVersion := fmt.Sprintf("serverless (%s)", project.Type) + var serviceStatus []ServiceStatus + for service, status := range projectServiceStatus { + serviceStatus = append(serviceStatus, ServiceStatus{ + Name: service, + Version: serverlessVersion, + Status: status, + }) + } + + agentStatus, err := sp.localAgentStatus() + if err != nil { + return nil, fmt.Errorf("failed to get local agent status: %w", err) + } + + serviceStatus = append(serviceStatus, agentStatus...) + + return serviceStatus, nil +} + +func (sp *serverlessProvider) localAgentStatus() ([]ServiceStatus, error) { + var services []ServiceStatus + serviceStatusFunc := func(description docker.ContainerDescription) error { + service, err := newServiceStatus(&description) + if err != nil { + return err + } + services = append(services, *service) + return nil + } + + err := runOnLocalServices(sp.composeProjectName(), serviceStatusFunc) + if err != nil { + return nil, err + } + + return services, nil +} + +func localServiceNames(project string) ([]string, error) { + services := []string{} + serviceFunc := func(description docker.ContainerDescription) error { + services = append(services, description.Config.Labels[serviceLabelDockerCompose]) + return nil + } + + err := runOnLocalServices(project, serviceFunc) + if err != nil { + return nil, err + } + + return services, nil +} + +func runOnLocalServices(project string, serviceFunc func(docker.ContainerDescription) error) error { + // query directly to docker to avoid load environment variables (e.g. STACK_VERSION_VARIANT) and profiles + containerIDs, err := docker.ContainerIDsWithLabel(projectLabelDockerCompose, project) + if err != nil { + return err + } + + if len(containerIDs) == 0 { + return nil + } + + containerDescriptions, err := docker.InspectContainers(containerIDs...) + if err != nil { + return err + } + + for _, containerDescription := range containerDescriptions { + serviceName := containerDescription.Config.Labels[serviceLabelDockerCompose] + if strings.HasSuffix(serviceName, readyServicesSuffix) { + continue + } + err := serviceFunc(containerDescription) + if err != nil { + return err + } + } + return nil +} diff --git a/internal/stack/serverlessresources.go b/internal/stack/serverlessresources.go new file mode 100644 index 0000000000..76c1630d3d --- /dev/null +++ b/internal/stack/serverlessresources.go @@ -0,0 +1,67 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package stack + +import ( + "fmt" + "os" + "path/filepath" + "strings" + + "github.com/elastic/go-resource" + + "github.com/elastic/elastic-package/internal/install" + "github.com/elastic/elastic-package/internal/profile" +) + +var ( + serverlessStackResources = []resource.Resource{ + &resource.File{ + Path: SnapshotFile, + Content: staticSource.Template("_static/serverless-elastic-agent.yml.tmpl"), + }, + &resource.File{ + Path: ElasticAgentEnvFile, + Content: staticSource.Template("_static/elastic-agent.env.tmpl"), + }, + } +) + +func applyServerlessResources(profile *profile.Profile, stackVersion string, config Config) error { + appConfig, err := install.Configuration() + if err != nil { + return fmt.Errorf("can't read application configuration: %w", err) + } + + stackDir := filepath.Join(profile.ProfilePath, profileStackPath) + + resourceManager := resource.NewManager() + resourceManager.AddFacter(resource.StaticFacter{ + "agent_version": stackVersion, + "agent_image": appConfig.StackImageRefs(stackVersion).ElasticAgent, + "username": config.ElasticsearchUsername, + "password": config.ElasticsearchPassword, + "kibana_host": config.KibanaHost, + "fleet_url": config.Parameters[paramServerlessFleetURL], + }) + + os.MkdirAll(stackDir, 0755) + resourceManager.RegisterProvider("file", &resource.FileProvider{ + Prefix: stackDir, + }) + + results, err := resourceManager.Apply(serverlessStackResources) + if err != nil { + var errors []string + for _, result := range results { + if err := result.Err(); err != nil { + errors = append(errors, err.Error()) + } + } + return fmt.Errorf("%w: %s", err, strings.Join(errors, ", ")) + } + + return nil +} diff --git a/internal/testrunner/runners/pipeline/runner.go b/internal/testrunner/runners/pipeline/runner.go index 57d1d6f2bd..3c3e721779 100644 --- a/internal/testrunner/runners/pipeline/runner.go +++ b/internal/testrunner/runners/pipeline/runner.go @@ -23,6 +23,7 @@ import ( "github.com/elastic/elastic-package/internal/multierror" "github.com/elastic/elastic-package/internal/packages" "github.com/elastic/elastic-package/internal/signal" + "github.com/elastic/elastic-package/internal/stack" "github.com/elastic/elastic-package/internal/testrunner" ) @@ -291,7 +292,16 @@ func (r *runner) verifyResults(testCaseFile string, config *testConfig, result * } } - err := compareResults(testCasePath, config, result) + // TODO: currently GeoIP related fields are being removed when the serverless provider is used. + stackConfig, err := stack.LoadConfig(r.options.Profile) + if err != nil { + return err + } + skipGeoIP := false + if stackConfig.Provider == stack.ProviderServerless { + skipGeoIP = true + } + err = compareResults(testCasePath, config, result, skipGeoIP) if _, ok := err.(testrunner.ErrTestCaseFailed); ok { return err } diff --git a/internal/testrunner/runners/pipeline/test_result.go b/internal/testrunner/runners/pipeline/test_result.go index 29c9fb8380..f544c9ebd7 100644 --- a/internal/testrunner/runners/pipeline/test_result.go +++ b/internal/testrunner/runners/pipeline/test_result.go @@ -23,6 +23,25 @@ import ( const expectedTestResultSuffix = "-expected.json" +var geoIPKeys = []string{ + "as", + "geo", + "client.as", + "client.geo", + "destination.as", + "destination.geo", + "host.geo", // not defined host.as in ECS + "observer.geo", // not defined observer.as in ECS + "server.as", + "server.geo", + "source.as", + "source.geo", + "threat.enrichments.indicateor.as", + "threat.enrichments.indicateor.geo", + "threat.indicateor.as", + "threat.indicateor.geo", +} + type testResult struct { events []json.RawMessage } @@ -46,8 +65,8 @@ func writeTestResult(testCasePath string, result *testResult) error { return nil } -func compareResults(testCasePath string, config *testConfig, result *testResult) error { - resultsWithoutDynamicFields, err := adjustTestResult(result, config) +func compareResults(testCasePath string, config *testConfig, result *testResult, skipGeoip bool) error { + resultsWithoutDynamicFields, err := adjustTestResult(result, config, skipGeoip) if err != nil { return fmt.Errorf("can't adjust test results: %w", err) } @@ -57,7 +76,7 @@ func compareResults(testCasePath string, config *testConfig, result *testResult) return fmt.Errorf("marshalling actual test results failed: %w", err) } - expectedResults, err := readExpectedTestResult(testCasePath, config) + expectedResults, err := readExpectedTestResult(testCasePath, config, skipGeoip) if err != nil { return fmt.Errorf("reading expected test result failed: %w", err) } @@ -138,7 +157,7 @@ func diffJson(want, got []byte) (string, error) { return buf.String(), err } -func readExpectedTestResult(testCasePath string, config *testConfig) (*testResult, error) { +func readExpectedTestResult(testCasePath string, config *testConfig, skipGeoIP bool) (*testResult, error) { testCaseDir := filepath.Dir(testCasePath) testCaseFile := filepath.Base(testCasePath) @@ -153,19 +172,17 @@ func readExpectedTestResult(testCasePath string, config *testConfig) (*testResul return nil, fmt.Errorf("unmarshalling expected test result failed: %w", err) } - adjusted, err := adjustTestResult(u, config) + adjusted, err := adjustTestResult(u, config, skipGeoIP) if err != nil { return nil, fmt.Errorf("adjusting test result failed: %w", err) } return adjusted, nil } -func adjustTestResult(result *testResult, config *testConfig) (*testResult, error) { - if config == nil || config.DynamicFields == nil { +func adjustTestResult(result *testResult, config *testConfig, skipGeoIP bool) (*testResult, error) { + if !skipGeoIP && (config == nil || config.DynamicFields == nil) { return result, nil } - - // Strip dynamic fields from test result var stripped testResult for _, event := range result.events { if event == nil { @@ -179,10 +196,22 @@ func adjustTestResult(result *testResult, config *testConfig) (*testResult, erro return nil, fmt.Errorf("can't unmarshal event: %s: %w", string(event), err) } - for key := range config.DynamicFields { - err := m.Delete(key) - if err != nil && err != common.ErrKeyNotFound { - return nil, fmt.Errorf("can't remove dynamic field: %w", err) + if config != nil && config.DynamicFields != nil { + // Strip dynamic fields from test result + for key := range config.DynamicFields { + err := m.Delete(key) + if err != nil && err != common.ErrKeyNotFound { + return nil, fmt.Errorf("can't remove dynamic field: %w", err) + } + } + } + + if skipGeoIP { + for _, key := range geoIPKeys { + err := m.Delete(key) + if err != nil && err != common.ErrKeyNotFound { + return nil, fmt.Errorf("can't remove geoIP field: %w", err) + } } }