diff --git a/cmd/dcp/main.go b/cmd/dcp/main.go index bbc262ce..c7ddddd2 100644 --- a/cmd/dcp/main.go +++ b/cmd/dcp/main.go @@ -8,12 +8,17 @@ package main //go:generate goversioninfo import ( + "context" "os" + "path/filepath" + "sync" + "time" kubeapiserver "k8s.io/apiserver/pkg/server" cmdutil "github.com/microsoft/dcp/internal/commands" "github.com/microsoft/dcp/internal/dcp/commands" + "github.com/microsoft/dcp/internal/telemetry" "github.com/microsoft/dcp/pkg/logger" "github.com/microsoft/dcp/pkg/osutil" "github.com/microsoft/dcp/pkg/resiliency" @@ -46,15 +51,77 @@ func main() { ctx := kubeapiserver.SetupSignalContext() + telemetrySystem := telemetry.GetTelemetrySystem() + var shutdownTelemetryOnce sync.Once + shutdownTelemetry := func() { + shutdownTelemetryOnce.Do(func() { + shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 5*time.Second) + defer shutdownCancel() + _ = telemetrySystem.Shutdown(shutdownCtx) + }) + } + defer shutdownTelemetry() + + ctx, commandSpan := telemetry.StartStartupSpan(ctx, telemetry.StartupSpanCommand) + telemetry.SetAttribute(ctx, telemetry.StartupAttributeProcessPID, os.Getpid()) + telemetry.SetAttribute(ctx, telemetry.StartupAttributeProcessExecutableName, filepath.Base(os.Args[0])) + if len(os.Args) > 1 { + telemetry.SetAttribute(ctx, telemetry.StartupAttributeCommandArgumentCount, len(os.Args)-1) + } + if commandName := getCommandName(os.Args); commandName != "" { + telemetry.SetAttribute(ctx, telemetry.StartupAttributeCommandName, commandName) + } + spanEnded := false + endCommandSpan := func() { + if !spanEnded { + commandSpan.End() + spanEnded = true + } + } + defer endCommandSpan() + + telemetry.AddEvent(ctx, telemetry.StartupEventCommandRootCommandCreating) root, err := commands.NewRootCmd(log) if err != nil { + telemetry.SetAttribute(ctx, telemetry.StartupAttributeCommandExitCode, errSetup) + commandSpan.SetError(err) + endCommandSpan() + shutdownTelemetry() cmdutil.ErrorExit(log, err, errSetup) } + telemetry.AddEvent(ctx, telemetry.StartupEventCommandRootCommandCreated) + + telemetry.AddEvent(ctx, telemetry.StartupEventCommandExecuteStart) + endCommandSpan() - err = root.ExecuteContext(ctx) + lifetimeCtx, lifetimeSpan := telemetry.StartStartupSpan(ctx, telemetry.StartupSpanCommandLifetime) + if commandName := getCommandName(os.Args); commandName != "" { + telemetry.SetAttribute(lifetimeCtx, telemetry.StartupAttributeCommandName, commandName) + } + telemetry.SetAttribute(lifetimeCtx, telemetry.StartupAttributeCommandArgumentCount, max(len(os.Args)-1, 0)) + + err = root.ExecuteContext(lifetimeCtx) if err != nil { + telemetry.SetAttribute(lifetimeCtx, telemetry.StartupAttributeCommandExitCode, errCommand) + telemetry.AddEvent(lifetimeCtx, telemetry.StartupEventCommandExecuteFailed) + lifetimeSpan.SetError(err) + lifetimeSpan.End() + shutdownTelemetry() cmdutil.ErrorExit(log, err, errCommand) } else { + telemetry.SetAttribute(lifetimeCtx, telemetry.StartupAttributeCommandExitCode, 0) + telemetry.AddEvent(lifetimeCtx, telemetry.StartupEventCommandExecuteSucceeded) + lifetimeSpan.End() log.Flush() } } + +func getCommandName(args []string) string { + for _, arg := range args[1:] { + if arg != "" && arg[0] != '-' { + return arg + } + } + + return "" +} diff --git a/controllers/container_controller.go b/controllers/container_controller.go index b0429fbc..afa0656b 100644 --- a/controllers/container_controller.go +++ b/controllers/container_controller.go @@ -32,6 +32,7 @@ import ( "github.com/microsoft/dcp/internal/containers" "github.com/microsoft/dcp/internal/health" "github.com/microsoft/dcp/internal/networking" + "github.com/microsoft/dcp/internal/telemetry" "github.com/microsoft/dcp/internal/templating" "github.com/microsoft/dcp/internal/version" "github.com/microsoft/dcp/pkg/commonapi" @@ -204,7 +205,13 @@ func (r *ContainerReconciler) SetupWithManager(mgr ctrl.Manager, name string) er Complete(r) } -func (r *ContainerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { +func (r *ContainerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (result ctrl.Result, err error) { + ctx, span := r.StartReconciliationSpan(ctx, req) + defer func() { + span.SetError(err) + span.End() + }() + reader, log := r.StartReconciliation(req) if ctx.Err() != nil { @@ -213,7 +220,7 @@ func (r *ContainerReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( } container := apiv1.Container{} - err := reader.Get(ctx, req.NamespacedName, &container) + err = reader.Get(ctx, req.NamespacedName, &container) if err != nil { if apierrors.IsNotFound(err) { @@ -229,6 +236,9 @@ func (r *ContainerReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( getSucceededCounter.Add(ctx, 1) } + r.SetObjectAttributes(ctx, &container) + telemetry.SetAttribute(ctx, telemetry.StartupAttributeResourceState, string(container.Status.State)) + log = log.WithValues(logger.RESOURCE_LOG_STREAM_ID, container.GetResourceId()) if container.Status.ContainerID != "" { @@ -260,17 +270,24 @@ func (r *ContainerReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( additionalReconcileDelay = LongDelay } - result, saveErr := r.SaveChangesWithDelay(ctx, &container, patch, change, additionalReconcileDelay, nil, log) - return result, saveErr + result, err = r.SaveChangesWithDelay(ctx, &container, patch, change, additionalReconcileDelay, nil, log) + telemetry.SetAttribute(ctx, telemetry.StartupAttributeResourceFinalState, string(container.Status.State)) + return result, err } func (r *ContainerReconciler) manageContainer(ctx context.Context, container *apiv1.Container, log logr.Logger) objectChange { + ctx, span := telemetry.StartStartupSpan(ctx, telemetry.StartupSpanContainerManage) + r.SetObjectAttributes(ctx, container) + defer span.End() + targetContainerState := container.Status.State + telemetry.SetAttribute(ctx, telemetry.StartupAttributeContainerTargetState, string(targetContainerState)) _, rcd := r.runningContainers.BorrowByNamespacedName(container.NamespacedName()) if rcd != nil { // In-memory container state is not subject to issues related to caching and // status updates failed due to conflict, so it is fresher and has precedence. targetContainerState = rcd.containerState + telemetry.SetAttribute(ctx, telemetry.StartupAttributeContainerRunState, string(rcd.containerState)) } // Even if the new container state is (as it is usually the case) the same as the target state, diff --git a/controllers/executable_controller.go b/controllers/executable_controller.go index 75662e06..e5359b8c 100644 --- a/controllers/executable_controller.go +++ b/controllers/executable_controller.go @@ -30,6 +30,7 @@ import ( "github.com/microsoft/dcp/internal/health" "github.com/microsoft/dcp/internal/logs" "github.com/microsoft/dcp/internal/networking" + "github.com/microsoft/dcp/internal/telemetry" "github.com/microsoft/dcp/internal/templating" "github.com/microsoft/dcp/pkg/commonapi" "github.com/microsoft/dcp/pkg/concurrency" @@ -142,7 +143,13 @@ Updating the Executable path/working directory/arguments/environment will not ef Status will be updated based on the status of the corresponding run and the run will be terminated if the Executable is deleted. */ -func (r *ExecutableReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { +func (r *ExecutableReconciler) Reconcile(ctx context.Context, req ctrl.Request) (result ctrl.Result, err error) { + ctx, span := r.StartReconciliationSpan(ctx, req) + defer func() { + span.SetError(err) + span.End() + }() + reader, log := r.StartReconciliation(req) // Check to see if the request context has already expired @@ -169,6 +176,9 @@ func (r *ExecutableReconciler) Reconcile(ctx context.Context, req ctrl.Request) getSucceededCounter.Add(ctx, 1) } + r.SetObjectAttributes(ctx, &exe) + telemetry.SetAttribute(ctx, telemetry.StartupAttributeResourceState, string(exe.Status.State)) + // Route logs to the resource sink log = log.WithValues(logger.RESOURCE_LOG_STREAM_ID, exe.GetResourceId()) @@ -185,7 +195,8 @@ func (r *ExecutableReconciler) Reconcile(ctx context.Context, req ctrl.Request) change = r.manageExecutable(ctx, &exe, log) } - result, err := r.SaveChanges(ctx, &exe, patch, change, nil, log) + result, err = r.SaveChanges(ctx, &exe, patch, change, nil, log) + telemetry.SetAttribute(ctx, telemetry.StartupAttributeResourceFinalState, string(exe.Status.State)) if exe.Done() { log.V(1).Info("Executable reached done state") } @@ -226,12 +237,18 @@ func (r *ExecutableReconciler) handleDeletionRequest(ctx context.Context, exe *a } func (r *ExecutableReconciler) manageExecutable(ctx context.Context, exe *apiv1.Executable, log logr.Logger) objectChange { + ctx, span := telemetry.StartStartupSpan(ctx, telemetry.StartupSpanExecutableManage) + r.SetObjectAttributes(ctx, exe) + defer span.End() + targetExecutableState := exe.Status.State + telemetry.SetAttribute(ctx, telemetry.StartupAttributeExecutableTargetState, string(targetExecutableState)) _, runInfo := r.runs.BorrowByNamespacedName(exe.NamespacedName()) if runInfo != nil { // In-memory run info is not subject to issues related to caching // and failure to update Executable Status due to conflict, so it takes precedence. targetExecutableState = runInfo.ExeState + telemetry.SetAttribute(ctx, telemetry.StartupAttributeExecutableRunState, string(runInfo.ExeState)) } // Even if Executable.State == targetExecutableState, we still want to run the initializer diff --git a/controllers/reconciler_base.go b/controllers/reconciler_base.go index 5a3129ae..4a349bdd 100644 --- a/controllers/reconciler_base.go +++ b/controllers/reconciler_base.go @@ -14,6 +14,7 @@ import ( "github.com/cenkalti/backoff/v4" "github.com/go-logr/logr" + "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" k8serrors "k8s.io/apimachinery/pkg/api/errors" @@ -101,6 +102,22 @@ func (rb *ReconcilerBase[T, PT]) StartReconciliation(req ctrl.Request) (ctrl_cli return reader, log } +func (rb *ReconcilerBase[T, PT]) StartReconciliationSpan(ctx context.Context, req ctrl.Request) (context.Context, telemetry.StartupSpan) { + return telemetry.StartStartupSpan( + ctx, + telemetry.StartupSpanControllerReconcile, + attribute.String(telemetry.StartupAttributeControllerKind, rb.kind), + attribute.String(telemetry.StartupAttributeResourceKind, rb.kind), + attribute.String(telemetry.StartupAttributeResourceName, req.Name), + attribute.String(telemetry.StartupAttributeResourceNamespace, req.Namespace), + ) +} + +func (rb *ReconcilerBase[T, PT]) SetObjectAttributes(ctx context.Context, obj PT) { + objectMeta := obj.GetObjectMeta() + telemetry.SetDcpResourceAttributes(ctx, rb.kind, objectMeta.GetNamespace(), objectMeta.GetName(), objectMeta.GetAnnotations()) +} + // Schedules reconciliation for specific object identified by namespaced name. func (rb *ReconcilerBase[T, PT]) ScheduleReconciliation(nn types.NamespacedName) { rb.debouncer.ReconciliationNeeded(rb.LifetimeCtx, nn) diff --git a/controllers/service_controller.go b/controllers/service_controller.go index db33054f..e2da8251 100644 --- a/controllers/service_controller.go +++ b/controllers/service_controller.go @@ -13,7 +13,6 @@ import ( "net" "os" "path/filepath" - "sync/atomic" "github.com/go-logr/logr" "go.opentelemetry.io/otel/trace" @@ -151,11 +150,14 @@ func (r *ServiceReconciler) requestReconcileForEndpoint(ctx context.Context, obj } } -func (r *ServiceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { - log := r.Log.WithValues( - "Service", req.NamespacedName.String(), - "Reconciliation", atomic.AddUint32(&r.reconciliationSeqNo, 1), - ) +func (r *ServiceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (result ctrl.Result, err error) { + ctx, span := r.StartReconciliationSpan(ctx, req) + defer func() { + span.SetError(err) + span.End() + }() + + reader, log := r.StartReconciliation(req) if ctx.Err() != nil { log.V(1).Info("Request context expired, nothing to do...") @@ -163,7 +165,7 @@ func (r *ServiceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct } svc := apiv1.Service{} - if err := r.Get(ctx, req.NamespacedName, &svc); err != nil { + if err := reader.Get(ctx, req.NamespacedName, &svc); err != nil { if apimachinery_errors.IsNotFound(err) { log.V(1).Info("The Service object does not exist yet or was deleted") r.stopService(req.NamespacedName, nil, log) @@ -178,6 +180,9 @@ func (r *ServiceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct getSucceededCounter.Add(ctx, 1) } + r.SetObjectAttributes(ctx, &svc) + telemetry.SetAttribute(ctx, telemetry.StartupAttributeResourceState, string(svc.Status.State)) + log = log.WithValues(logger.RESOURCE_LOG_STREAM_ID, svc.GetResourceId()) var change objectChange @@ -198,7 +203,8 @@ func (r *ServiceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct } } - result, err := r.SaveChangesWithDelay(ctx, &svc, patch, change, r.config.AdditionalReconciliationDelay, nil, log) + result, err = r.SaveChangesWithDelay(ctx, &svc, patch, change, r.config.AdditionalReconciliationDelay, nil, log) + telemetry.SetAttribute(ctx, telemetry.StartupAttributeResourceFinalState, string(svc.Status.State)) return result, err } @@ -214,7 +220,20 @@ func (r *ServiceReconciler) stopService(svcName types.NamespacedName, svc *apiv1 } func (r *ServiceReconciler) ensureServiceEffectiveAddressAndPort(ctx context.Context, svc *apiv1.Service, log logr.Logger) objectChange { + ctx, span := telemetry.StartStartupSpan(ctx, telemetry.StartupSpanServiceEnsureEffectiveAddress) + r.SetObjectAttributes(ctx, svc) + defer func() { + telemetry.SetAttribute(ctx, telemetry.StartupAttributeServiceFinalState, string(svc.Status.State)) + if svc.Status.EffectivePort != 0 { + telemetry.SetAttribute(ctx, telemetry.StartupAttributeServiceEffectivePort, int(svc.Status.EffectivePort)) + } + span.End() + }() + oldState := svc.Status.State + telemetry.SetAttribute(ctx, telemetry.StartupAttributeServicePreviousState, string(oldState)) + telemetry.SetAttribute(ctx, telemetry.StartupAttributeServiceAddressAllocation, string(svc.Spec.AddressAllocationMode)) + oldEffectiveAddress := svc.Status.EffectiveAddress oldEffectivePort := svc.Status.EffectivePort oldEndpointNamespacedName := types.NamespacedName{ diff --git a/internal/dcp/bootstrap/dcp_run.go b/internal/dcp/bootstrap/dcp_run.go index d05f173e..1beae220 100644 --- a/internal/dcp/bootstrap/dcp_run.go +++ b/internal/dcp/bootstrap/dcp_run.go @@ -22,6 +22,7 @@ import ( "github.com/microsoft/dcp/internal/hosting" "github.com/microsoft/dcp/internal/notifications" "github.com/microsoft/dcp/internal/perftrace" + "github.com/microsoft/dcp/internal/telemetry" "github.com/microsoft/dcp/pkg/extensions" "github.com/microsoft/dcp/pkg/kubeconfig" "github.com/microsoft/dcp/pkg/logger" @@ -46,7 +47,23 @@ func DcpRun( allExtensions []DcpExtension, invocationFlags []string, log logr.Logger, -) error { +) (err error) { + ctx, span := telemetry.StartStartupSpan(ctx, telemetry.StartupSpanRun) + telemetry.SetAttribute(ctx, telemetry.StartupAttributeServerOnly, serverOnly) + telemetry.SetAttribute(ctx, telemetry.StartupAttributeExtensionsCount, len(allExtensions)) + telemetry.SetAttribute(ctx, telemetry.StartupAttributeInvocationFlagsCount, len(invocationFlags)) + spanEnded := false + endRunSpan := func(runErr error) { + if !spanEnded { + span.SetError(runErr) + span.End() + spanEnded = true + } + } + defer func() { + endRunSpan(err) + }() + // If the context is already complete, we should not proceed with running the API server and controllers. if ctx.Err() != nil { return ctx.Err() @@ -70,6 +87,7 @@ func DcpRun( var notifySrc notifications.UnixSocketNotificationSource if !serverOnly { + notifyCreateCtx, notifyCreateSpan := telemetry.StartStartupSpan(ctx, telemetry.StartupSpanNotificationSourceCreate) // Do not use apiServerCtx for the notification source because it is a monitor context // that gets cancelled monitored process exits, triggering API server shutdown. // We want to be able to send notifications throughout the shutdown process, so we use a separate context. @@ -78,6 +96,9 @@ func DcpRun( var notifySrcErr error notifySrc, notifySrcErr = createNotificationSource(notifyCtx, log) + telemetry.SetAttribute(notifyCreateCtx, telemetry.StartupAttributeNotificationSourceCreated, notifySrcErr == nil) + notifyCreateSpan.SetError(notifySrcErr) + notifyCreateSpan.End() if notifySrcErr == nil { appmgmt.AddBeforeCleanupTask("SendCleanupStartedNotification", func() { // Best effort @@ -91,25 +112,40 @@ func DcpRun( controllerExtensions := slices.Select(allExtensions, func(ext DcpExtension) bool { return slices.Contains(ext.Capabilities, extensions.ControllerCapability) }) + telemetry.SetAttribute(ctx, telemetry.StartupAttributeControllerExtensionsCount, len(controllerExtensions)) + hostedServicesCtx, hostedServicesSpan := telemetry.StartStartupSpan(ctx, telemetry.StartupSpanHostedServicesCreate) hostedServices := []hosting.Service{} + var hostedServicesErr error if !serverOnly { // Always run the built-in controllers controllerService, controllerServiceErr := newRunControllersService(cwd, invocationFlags, log) if controllerServiceErr != nil { - return fmt.Errorf("could not start built-in controllers: %w", controllerServiceErr) + hostedServicesErr = fmt.Errorf("could not start built-in controllers: %w", controllerServiceErr) + } else { + hostedServices = append(hostedServices, controllerService) } - hostedServices = append(hostedServices, controllerService) // Also run any extension controllers for _, ext := range controllerExtensions { + if hostedServicesErr != nil { + break + } + extService, extCreationErr := NewDcpExtensionService(cwd, ext, "run-controllers", invocationFlags, log) if extCreationErr != nil { - return fmt.Errorf("could not start controller extension '%s': %w", ext.Name, extCreationErr) + hostedServicesErr = fmt.Errorf("could not start controller extension '%s': %w", ext.Name, extCreationErr) + break } hostedServices = append(hostedServices, extService) } } + telemetry.SetAttribute(hostedServicesCtx, telemetry.StartupAttributeHostedServicesCount, len(hostedServices)) + hostedServicesSpan.SetError(hostedServicesErr) + hostedServicesSpan.End() + if hostedServicesErr != nil { + return hostedServicesErr + } host := &hosting.Host{ Services: hostedServices, @@ -129,14 +165,27 @@ func DcpRun( return perftrace.StartProfiling(ctx, ctxCancel, perftrace.ProfileTypeSnapshot, log) }, } + telemetry.AddEvent(ctx, telemetry.StartupEventRunApiServerStarting) + apiServerStartCtx, apiServerStartSpan := telemetry.StartStartupSpan(ctx, telemetry.StartupSpanApiServerStart) apiServerShutdown, apiServerErr := apiServer.Run(apiServerCtx, runConfig) + if apiServerErr == nil { + telemetry.AddEvent(apiServerStartCtx, telemetry.StartupEventRunApiServerStarted) + } + apiServerStartSpan.SetError(apiServerErr) + apiServerStartSpan.End() if apiServerErr != nil { return apiServerErr } log.V(1).Info("About to launch host services") + telemetry.AddEvent(ctx, telemetry.StartupEventRunHostedServicesStarting) + hostStartCtx, hostStartSpan := telemetry.StartStartupSpan(ctx, telemetry.StartupSpanHostedServicesStart) + telemetry.SetAttribute(hostStartCtx, telemetry.StartupAttributeHostedServicesCount, len(hostedServices)) shutdownErrors, lifecycleMsgs := host.RunAsync(hostCtx) + telemetry.AddEvent(hostStartCtx, telemetry.StartupEventRunHostedServicesStarted) + hostStartSpan.End() + endRunSpan(nil) shutdownHost := func() error { cancelHostCtx() var allErrors error @@ -151,16 +200,17 @@ func DcpRun( return allErrors } - var err error // Wait for the user to signal that they want to shut down. for { select { case <-cleanupCtx.Done(): // We are being asked to shut down. log.Info("Shutting down...") + telemetry.AddEvent(ctx, telemetry.StartupEventRunShutdownRequested) // Determine what level of resource cleanup is requested. resourceCleanup := requestedResourceCleanup.Load().(apiserver.ApiServerResourceCleanup) + telemetry.SetAttribute(ctx, telemetry.StartupAttributeResourceCleanup, string(resourceCleanup)) log.V(1).Info("Invoking BeforeApiSrvShutdown event handler.", "ResourceCleanup", resourceCleanup) // If we are in server-only mode (no standard controllers) such as when running tests, @@ -183,6 +233,7 @@ func DcpRun( case <-apiServerShutdown: err = fmt.Errorf("API server shut down unexpectedly. Graceful shutdown is not possible.") log.Error(err, "Terminating...") + telemetry.AddEvent(ctx, telemetry.StartupEventRunApiServerStopped) return errors.Join(err, shutdownHost()) case msg, isOpen := <-lifecycleMsgs: @@ -193,6 +244,7 @@ func DcpRun( if msg.Err != nil { log.Error(msg.Err, fmt.Sprintf("Controller '%s' exited with an error. Application may not function correctly.", msg.ServiceName)) + telemetry.AddEvent(ctx, telemetry.StartupEventHostedServiceError) // Let the user decide whether to continue or not, do not break the loop yet. } } diff --git a/internal/dcp/bootstrap/extensions.go b/internal/dcp/bootstrap/extensions.go index 19ca420d..35c3a3ed 100644 --- a/internal/dcp/bootstrap/extensions.go +++ b/internal/dcp/bootstrap/extensions.go @@ -20,6 +20,7 @@ import ( "github.com/go-logr/logr" "github.com/microsoft/dcp/internal/dcppaths" + "github.com/microsoft/dcp/internal/telemetry" "github.com/microsoft/dcp/pkg/extensions" "github.com/microsoft/dcp/pkg/process" "github.com/microsoft/dcp/pkg/slices" @@ -57,13 +58,19 @@ var ( } ) -func GetExtensions(ctx context.Context, log logr.Logger) ([]DcpExtension, error) { +func GetExtensions(ctx context.Context, log logr.Logger) (discoveredExtensions []DcpExtension, err error) { + ctx, span := telemetry.StartStartupSpan(ctx, telemetry.StartupSpanExtensionsDiscover) + defer func() { + telemetry.SetAttribute(ctx, telemetry.StartupAttributeExtensionsCount, len(discoveredExtensions)) + span.SetError(err) + span.End() + }() + extDirs, err := dcppaths.GetExtensionsDirs() if err != nil { return nil, err } - - extensions := []DcpExtension{} + telemetry.SetAttribute(ctx, telemetry.StartupAttributeExtensionsDirectoryCount, len(extDirs)) for _, extDir := range extDirs { if _, statErr := os.Stat(extDir); statErr != nil { @@ -117,7 +124,7 @@ func GetExtensions(ctx context.Context, log logr.Logger) ([]DcpExtension, error) return capabilityQueryErr } - extensions = append(extensions, ext) + discoveredExtensions = append(discoveredExtensions, ext) } return nil @@ -127,10 +134,24 @@ func GetExtensions(ctx context.Context, log logr.Logger) ([]DcpExtension, error) return nil, fmt.Errorf("could not determine installed DCP extensions: %w", err) } - return extensions, nil + return discoveredExtensions, nil } -func getExtensionCapabilities(ctx context.Context, path string, log logr.Logger) (DcpExtension, error) { +func getExtensionCapabilities(ctx context.Context, path string, log logr.Logger) (ext DcpExtension, err error) { + ctx, span := telemetry.StartStartupSpan(ctx, telemetry.StartupSpanExtensionGetCapabilities) + telemetry.SetAttribute(ctx, telemetry.StartupAttributeExtensionExecutableName, filepath.Base(path)) + defer func() { + if ext.Id != "" { + telemetry.SetAttribute(ctx, telemetry.StartupAttributeExtensionID, ext.Id) + } + if ext.Name != "" { + telemetry.SetAttribute(ctx, telemetry.StartupAttributeExtensionName, ext.Name) + } + telemetry.SetAttribute(ctx, telemetry.StartupAttributeExtensionCapabilityCount, len(ext.Capabilities)) + span.SetError(err) + span.End() + }() + processExecutor := process.NewOSExecutor(log.WithName("extensions")) defer processExecutor.Dispose() if expandedPath, err := filepath.EvalSymlinks(path); err == nil { diff --git a/internal/dcp/commands/start_api_server.go b/internal/dcp/commands/start_api_server.go index ea90e6a8..51c2d38a 100644 --- a/internal/dcp/commands/start_api_server.go +++ b/internal/dcp/commands/start_api_server.go @@ -9,6 +9,7 @@ import ( "fmt" "os" "os/exec" + "path/filepath" "github.com/go-logr/logr" "github.com/spf13/cobra" @@ -17,6 +18,7 @@ import ( container_flags "github.com/microsoft/dcp/internal/containers/flags" "github.com/microsoft/dcp/internal/dcp/bootstrap" "github.com/microsoft/dcp/internal/perftrace" + "github.com/microsoft/dcp/internal/telemetry" usvc_io "github.com/microsoft/dcp/pkg/io" "github.com/microsoft/dcp/pkg/kubeconfig" "github.com/microsoft/dcp/pkg/logger" @@ -55,13 +57,31 @@ func NewStartApiSrvCommand(log logr.Logger) (*cobra.Command, error) { } func startApiSrv(log logr.Logger) func(cmd *cobra.Command, _ []string) error { - return func(cmd *cobra.Command, _ []string) error { + return func(cmd *cobra.Command, _ []string) (err error) { log = log.WithName("start-apiserver") + cmdCtx, span := telemetry.StartStartupSpan(cmd.Context(), telemetry.StartupSpanStartApiServer) + telemetry.SetAttribute(cmdCtx, telemetry.StartupAttributeCommandName, "start-apiserver") + telemetry.SetAttribute(cmdCtx, telemetry.StartupAttributeDetach, detach) + telemetry.SetAttribute(cmdCtx, telemetry.StartupAttributeServerOnly, serverOnly) + spanEnded := false + endStartupSpan := func(startErr error) { + if !spanEnded { + span.SetError(startErr) + span.End() + spanEnded = true + } + } + defer func() { + endStartupSpan(err) + }() + cmd.SetContext(cmdCtx) - apiServerCtx, apiServerCtxCancel := cmds.GetMonitorContextFromFlags(cmd.Context(), log.WithName("monitor")) + apiServerCtx, apiServerCtxCancel := cmds.GetMonitorContextFromFlags(cmdCtx, log.WithName("monitor")) defer apiServerCtxCancel() + telemetry.AddEvent(cmdCtx, telemetry.StartupEventStartApiServerMonitorConfigured) if detach { + forkCtx, forkSpan := telemetry.StartStartupSpan(cmdCtx, telemetry.StartupSpanStartApiServerFork) args := make([]string, 0, len(os.Args)-2) hasContainerRuntimeFlag := false @@ -80,6 +100,8 @@ func startApiSrv(log logr.Logger) func(cmd *cobra.Command, _ []string) error { } log.V(1).Info("Forking command", "Cmd", os.Args[0], "Args", args) + telemetry.SetAttribute(forkCtx, telemetry.StartupAttributeForkArgumentCount, len(args)) + telemetry.AddEvent(forkCtx, telemetry.StartupEventStartApiServerForkStarting) usvc_io.PreserveSessionFolder() // The forked process will take care of cleaning up the session folder @@ -90,20 +112,28 @@ func startApiSrv(log logr.Logger) func(cmd *cobra.Command, _ []string) error { if err := forked.Start(); err != nil { log.Error(err, "Forked process failed to run") + forkSpan.SetError(err) + forkSpan.End() return err } else { log.V(1).Info("Forked process started", "PID", forked.Process.Pid) + telemetry.SetAttribute(forkCtx, telemetry.StartupAttributeForkPID, forked.Process.Pid) + telemetry.AddEvent(forkCtx, telemetry.StartupEventStartApiServerForkStarted) } if err := forked.Process.Release(); err != nil { log.Error(err, "Release failed for process", "PID", forked.Process.Pid) + forkSpan.SetError(err) + forkSpan.End() return err } + forkSpan.End() + endStartupSpan(nil) return nil } - err := perftrace.CaptureStartupProfileIfRequested(apiServerCtx, log) + err = perftrace.CaptureStartupProfileIfRequested(apiServerCtx, log) if err != nil { log.Error(err, "Failed to capture startup profile") } @@ -111,24 +141,33 @@ func startApiSrv(log logr.Logger) func(cmd *cobra.Command, _ []string) error { defer usvc_io.CleanupSessionFolderIfNeeded() if rootDir == "" { + telemetry.AddEvent(cmdCtx, telemetry.StartupEventStartApiServerRootDirResolving) rootDir, err = os.Getwd() if err != nil { return fmt.Errorf("could not determine the working directory: %w", err) } } + telemetry.SetAttribute(cmdCtx, telemetry.StartupAttributeRootDirectoryBasename, filepath.Base(rootDir)) + kubeconfigCtx, kubeconfigSpan := telemetry.StartStartupSpan(cmdCtx, telemetry.StartupSpanStartApiServerEnsureKubeconfig) kconfig, err := kubeconfig.EnsureKubeconfigData(cmd.Flags(), log) + if err == nil { + telemetry.AddEvent(kubeconfigCtx, telemetry.StartupEventStartApiServerKubeconfigReady) + } + kubeconfigSpan.SetError(err) + kubeconfigSpan.End() if err != nil { return err } var allExtensions []bootstrap.DcpExtension if !serverOnly { - allExtensions, err = bootstrap.GetExtensions(apiServerCtx, log) + allExtensions, err = bootstrap.GetExtensions(cmdCtx, log) if err != nil { return err } } + telemetry.SetAttribute(cmdCtx, telemetry.StartupAttributeExtensionsCount, len(allExtensions)) invocationFlags := []string{"--kubeconfig", kconfig.Path()} if container_flags.GetRuntimeFlagValue() != container_flags.UnknownRuntime { @@ -137,9 +176,15 @@ func startApiSrv(log logr.Logger) func(cmd *cobra.Command, _ []string) error { if verbosityArg := logger.GetVerbosityArg(cmd.Flags()); verbosityArg != "" { invocationFlags = append(invocationFlags, verbosityArg) } + telemetry.SetAttribute(cmdCtx, telemetry.StartupAttributeInvocationFlagsCount, len(invocationFlags)) + + endStartupSpan(nil) + lifetimeCtx, lifetimeSpan := telemetry.StartStartupSpan(apiServerCtx, telemetry.StartupSpanStartApiServerLifetime) + telemetry.SetAttribute(lifetimeCtx, telemetry.StartupAttributeServerOnly, serverOnly) + telemetry.SetAttribute(lifetimeCtx, telemetry.StartupAttributeExtensionsCount, len(allExtensions)) err = bootstrap.DcpRun( - apiServerCtx, + lifetimeCtx, rootDir, kconfig, serverOnly, @@ -147,6 +192,8 @@ func startApiSrv(log logr.Logger) func(cmd *cobra.Command, _ []string) error { invocationFlags, log, ) + lifetimeSpan.SetError(err) + lifetimeSpan.End() return err } diff --git a/internal/dcpctrl/commands/run_controllers.go b/internal/dcpctrl/commands/run_controllers.go index 24f80ace..ca572648 100644 --- a/internal/dcpctrl/commands/run_controllers.go +++ b/internal/dcpctrl/commands/run_controllers.go @@ -29,6 +29,7 @@ import ( "github.com/microsoft/dcp/internal/notifications" "github.com/microsoft/dcp/internal/perftrace" "github.com/microsoft/dcp/internal/proxy" + "github.com/microsoft/dcp/internal/telemetry" "github.com/microsoft/dcp/pkg/kubeconfig" "github.com/microsoft/dcp/pkg/logger" "github.com/microsoft/dcp/pkg/process" @@ -63,7 +64,13 @@ func NewRunControllersCommand(log *logger.Logger) *cobra.Command { return runControllersCmd } -func getManager(ctx context.Context, log logr.Logger) (ctrl_manager.Manager, error) { +func getManager(ctx context.Context, log logr.Logger) (mgr ctrl_manager.Manager, err error) { + ctx, span := telemetry.StartStartupSpan(ctx, telemetry.StartupSpanControllersCreateManager) + defer func() { + span.SetError(err) + span.End() + }() + retryCtx, cancelRetryCtx := context.WithTimeout(ctx, dcpclient.DefaultServerConnectTimeout) defer cancelRetryCtx() @@ -71,7 +78,7 @@ func getManager(ctx context.Context, log logr.Logger) (ctrl_manager.Manager, err // Depending on the usage pattern, the API server may not be available immediately. // Do some retries with exponential back-off before giving up - mgr, err := resiliency.RetryGetExponential(retryCtx, func() (ctrl_manager.Manager, error) { + mgr, err = resiliency.RetryGetExponential(retryCtx, func() (ctrl_manager.Manager, error) { config := ctrlruntime.GetConfigOrDie() dcpclient.ApplyDcpOptions(config) ctrlMgrOpts := controllers.NewControllerManagerOptions(ctx, scheme, log) @@ -101,14 +108,22 @@ func getManager(ctx context.Context, log logr.Logger) (ctrl_manager.Manager, err } func runControllers(log logr.Logger) func(cmd *cobra.Command, _ []string) error { - return func(cmd *cobra.Command, _ []string) error { - err := perftrace.CaptureStartupProfileIfRequested(cmd.Context(), log) + return func(cmd *cobra.Command, _ []string) (err error) { + cmdCtx, span := telemetry.StartStartupSpan(cmd.Context(), telemetry.StartupSpanControllersRun) + telemetry.SetAttribute(cmdCtx, telemetry.StartupAttributeCommandName, "run-controllers") + defer func() { + span.SetError(err) + span.End() + }() + cmd.SetContext(cmdCtx) + + err = perftrace.CaptureStartupProfileIfRequested(cmdCtx, log) if err != nil { log.Error(err, "failed to capture startup profile") } defer logger.ReleaseAllResourceLogs() - ctrlCtx, ctrlCtxCancel := cmds.GetMonitorContextFromFlags(cmd.Context(), log) + ctrlCtx, ctrlCtxCancel := cmds.GetMonitorContextFromFlags(cmdCtx, log) defer ctrlCtxCancel() _, err = kubeconfig.RequireKubeconfigFlagValue(cmd.Flags()) @@ -276,8 +291,11 @@ func runControllers(log logr.Logger) func(cmd *cobra.Command, _ []string) error go func() { log.Info("Starting controller manager") var mgrRunErr error + managerCtx, managerSpan := telemetry.StartStartupSpan(ctrlCtx, telemetry.StartupSpanControllersManagerStart) defer func() { + managerSpan.SetError(mgrRunErr) + managerSpan.End() panicErr := resiliency.MakePanicError(recover(), log) if panicErr != nil { // Already logged by MakePanicError() @@ -292,7 +310,7 @@ func runControllers(log logr.Logger) func(cmd *cobra.Command, _ []string) error close(mgrRunResultCh) }() - mgrRunErr = mgr.Start(ctrlCtx) + mgrRunErr = mgr.Start(managerCtx) }() <-ctrlCtx.Done() diff --git a/internal/telemetry/exporter.go b/internal/telemetry/exporter.go index a216738e..8435aad2 100644 --- a/internal/telemetry/exporter.go +++ b/internal/telemetry/exporter.go @@ -3,84 +3,162 @@ * Licensed under the MIT License. See LICENSE in the project root for license information. *--------------------------------------------------------------------------------------------*/ - -package telemetry - -import ( - "context" - "fmt" - "os" - "path/filepath" - - usvc_io "github.com/microsoft/dcp/pkg/io" - "github.com/microsoft/dcp/pkg/logger" - "github.com/microsoft/dcp/pkg/osutil" - "go.opentelemetry.io/otel/exporters/stdout/stdoutmetric" - "go.opentelemetry.io/otel/exporters/stdout/stdouttrace" - sdkmetric "go.opentelemetry.io/otel/sdk/metric" - "go.opentelemetry.io/otel/sdk/metric/metricdata" - sdktrace "go.opentelemetry.io/otel/sdk/trace" - "go.uber.org/zap/zapcore" -) - -func newTraceExporter(logName string) (sdktrace.SpanExporter, error) { - logLevel, err := logger.GetDiagnosticsLogLevel() - - if err == nil && logLevel == zapcore.DebugLevel { - logFolder, logFolderErr := logger.EnsureDiagnosticsLogsFolder() - - if logFolderErr != nil { - return nil, logFolderErr - } - - telemetryFileName := fmt.Sprintf("%s-%s-telemetry-%s.json", logger.SessionId(), logName, logger.ProcessMomentHash(logger.PlainHash)) - telemetryFile, logFileErr := usvc_io.OpenFile(filepath.Join(logFolder, telemetryFileName), os.O_RDWR|os.O_CREATE|os.O_EXCL|os.O_TRUNC, osutil.PermissionOnlyOwnerReadWrite) - - if logFileErr != nil { - return nil, logFileErr - } - - return stdouttrace.New(stdouttrace.WithPrettyPrint(), stdouttrace.WithWriter(telemetryFile)) - } else { - return discardExporter{}, nil - } -} - -func newMetricExporter() (sdkmetric.Exporter, error) { - logLevel, err := logger.GetDiagnosticsLogLevel() - - if err == nil && logLevel == zapcore.DebugLevel { - return stdoutmetric.New() - } else { - return discardExporter{}, nil - } -} - -type discardExporter struct{} - -func (discardExporter) ExportSpans(ctx context.Context, spans []sdktrace.ReadOnlySpan) error { - return nil -} - -func (discardExporter) Export(context.Context, *metricdata.ResourceMetrics) error { - return nil -} - -func (discardExporter) ForceFlush(context.Context) error { - return nil -} - -func (discardExporter) Shutdown(ctx context.Context) error { - return nil -} - -func (discardExporter) Aggregation(ik sdkmetric.InstrumentKind) sdkmetric.Aggregation { - return sdkmetric.DefaultAggregationSelector(ik) -} - -func (discardExporter) Temporality(ik sdkmetric.InstrumentKind) metricdata.Temporality { - return sdkmetric.DefaultTemporalitySelector(ik) -} - -var _ sdktrace.SpanExporter = discardExporter{} -var _ sdkmetric.Exporter = discardExporter{} +package telemetry + +import ( + "context" + "fmt" + "net/url" + "os" + "path/filepath" + "strings" + + usvc_io "github.com/microsoft/dcp/pkg/io" + "github.com/microsoft/dcp/pkg/logger" + "github.com/microsoft/dcp/pkg/osutil" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" + "go.opentelemetry.io/otel/exporters/stdout/stdoutmetric" + "go.opentelemetry.io/otel/exporters/stdout/stdouttrace" + sdkmetric "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + "go.uber.org/zap/zapcore" +) + +const ( + otelExporterOtlpEndpointEnvVar = "OTEL_EXPORTER_OTLP_ENDPOINT" + otelExporterOtlpTracesEndpointEnvVar = "OTEL_EXPORTER_OTLP_TRACES_ENDPOINT" + otelExporterOtlpHeadersEnvVar = "OTEL_EXPORTER_OTLP_HEADERS" + otelExporterOtlpTracesHeadersEnvVar = "OTEL_EXPORTER_OTLP_TRACES_HEADERS" + otelExporterOtlpProtocolEnvVar = "OTEL_EXPORTER_OTLP_PROTOCOL" + aspireDashboardOtlpEndpointEnvVar = "ASPIRE_DASHBOARD_OTLP_ENDPOINT_URL" +) + +func newTraceExporter(logName string) (sdktrace.SpanExporter, error) { + if otlpEndpoint := getOtlpEndpoint(); otlpEndpoint != "" { + return newOtlpTraceExporter(context.Background(), otlpEndpoint) + } + + logLevel, err := logger.GetDiagnosticsLogLevel() + + if err == nil && logLevel == zapcore.DebugLevel { + logFolder, logFolderErr := logger.EnsureDiagnosticsLogsFolder() + + if logFolderErr != nil { + return nil, logFolderErr + } + + telemetryFileName := fmt.Sprintf("%s-%s-telemetry-%s.json", logger.SessionId(), logName, logger.ProcessMomentHash(logger.PlainHash)) + telemetryFile, logFileErr := usvc_io.OpenFile(filepath.Join(logFolder, telemetryFileName), os.O_RDWR|os.O_CREATE|os.O_EXCL|os.O_TRUNC, osutil.PermissionOnlyOwnerReadWrite) + + if logFileErr != nil { + return nil, logFileErr + } + + return stdouttrace.New(stdouttrace.WithPrettyPrint(), stdouttrace.WithWriter(telemetryFile)) + } else { + return discardExporter{}, nil + } +} + +func getOtlpEndpoint() string { + if endpoint := os.Getenv(otelExporterOtlpTracesEndpointEnvVar); endpoint != "" { + return endpoint + } + + if endpoint := os.Getenv(otelExporterOtlpEndpointEnvVar); endpoint != "" { + return endpoint + } + + return os.Getenv(aspireDashboardOtlpEndpointEnvVar) +} + +func newOtlpTraceExporter(ctx context.Context, endpoint string) (sdktrace.SpanExporter, error) { + protocol := strings.ToLower(os.Getenv(otelExporterOtlpProtocolEnvVar)) + if protocol != "" && protocol != "grpc" { + return nil, fmt.Errorf("unsupported OTLP trace exporter protocol %q", protocol) + } + + options := []otlptracegrpc.Option{} + if parsedEndpoint, err := url.Parse(endpoint); err == nil && parsedEndpoint.Host != "" { + options = append(options, otlptracegrpc.WithEndpoint(parsedEndpoint.Host)) + if parsedEndpoint.Scheme == "http" { + options = append(options, otlptracegrpc.WithInsecure()) + } + } else { + options = append(options, otlptracegrpc.WithEndpoint(endpoint)) + } + + if headers := getOtlpHeaders(); len(headers) > 0 { + options = append(options, otlptracegrpc.WithHeaders(headers)) + } + + return otlptracegrpc.New(ctx, options...) +} + +func getOtlpHeaders() map[string]string { + headers := parseOtlpHeaders(os.Getenv(otelExporterOtlpTracesHeadersEnvVar)) + if len(headers) > 0 { + return headers + } + + return parseOtlpHeaders(os.Getenv(otelExporterOtlpHeadersEnvVar)) +} + +func parseOtlpHeaders(rawHeaders string) map[string]string { + headers := map[string]string{} + for _, pair := range strings.Split(rawHeaders, ",") { + key, value, found := strings.Cut(pair, "=") + if !found { + continue + } + + key = strings.TrimSpace(key) + if key == "" { + continue + } + + headers[key] = strings.TrimSpace(value) + } + + return headers +} + +func newMetricExporter() (sdkmetric.Exporter, error) { + logLevel, err := logger.GetDiagnosticsLogLevel() + + if err == nil && logLevel == zapcore.DebugLevel { + return stdoutmetric.New() + } else { + return discardExporter{}, nil + } +} + +type discardExporter struct{} + +func (discardExporter) ExportSpans(ctx context.Context, spans []sdktrace.ReadOnlySpan) error { + return nil +} + +func (discardExporter) Export(context.Context, *metricdata.ResourceMetrics) error { + return nil +} + +func (discardExporter) ForceFlush(context.Context) error { + return nil +} + +func (discardExporter) Shutdown(ctx context.Context) error { + return nil +} + +func (discardExporter) Aggregation(ik sdkmetric.InstrumentKind) sdkmetric.Aggregation { + return sdkmetric.DefaultAggregationSelector(ik) +} + +func (discardExporter) Temporality(ik sdkmetric.InstrumentKind) metricdata.Temporality { + return sdkmetric.DefaultTemporalitySelector(ik) +} + +var _ sdktrace.SpanExporter = discardExporter{} +var _ sdkmetric.Exporter = discardExporter{} diff --git a/internal/telemetry/startup_context.go b/internal/telemetry/startup_context.go new file mode 100644 index 00000000..6d01cfd4 --- /dev/null +++ b/internal/telemetry/startup_context.go @@ -0,0 +1,265 @@ +/*--------------------------------------------------------------------------------------------- + * Copyright (c) Microsoft Corporation. All rights reserved. + * Licensed under the MIT License. See LICENSE in the project root for license information. + *--------------------------------------------------------------------------------------------*/ + +package telemetry + +import ( + "context" + "os" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/propagation" + "go.opentelemetry.io/otel/trace" +) + +const ( + StartupTracerName = "dcp.startup" + + StartupSpanCommand = "dcp.command" + StartupSpanCommandLifetime = "dcp.command.lifetime" + StartupSpanStartApiServer = "dcp.start_apiserver" + StartupSpanStartApiServerFork = "dcp.start_apiserver.fork" + StartupSpanStartApiServerEnsureKubeconfig = "dcp.start_apiserver.ensure_kubeconfig" + StartupSpanStartApiServerLifetime = "dcp.start_apiserver.lifetime" + StartupSpanRun = "dcp.run" + StartupSpanNotificationSourceCreate = "dcp.notification_source.create" + StartupSpanHostedServicesCreate = "dcp.hosted_services.create" + StartupSpanApiServerStart = "dcp.apiserver.start" + StartupSpanHostedServicesStart = "dcp.hosted_services.start" + StartupSpanExtensionsDiscover = "dcp.extensions.discover" + StartupSpanExtensionGetCapabilities = "dcp.extension.get_capabilities" + StartupSpanControllersCreateManager = "dcp.controllers.create_manager" + StartupSpanControllersRun = "dcp.controllers.run" + StartupSpanControllersManagerStart = "dcp.controllers.manager.start" + StartupSpanControllerReconcile = "dcp.controller.reconcile" + StartupSpanExecutableManage = "dcp.executable.manage" + StartupSpanContainerManage = "dcp.container.manage" + StartupSpanServiceEnsureEffectiveAddress = "dcp.service.ensure_effective_address" + + StartupEventCommandRootCommandCreating = "dcp.command.root_command.creating" + StartupEventCommandRootCommandCreated = "dcp.command.root_command.created" + StartupEventCommandExecuteStart = "dcp.command.execute.start" + StartupEventCommandExecuteFailed = "dcp.command.execute.failed" + StartupEventCommandExecuteSucceeded = "dcp.command.execute.succeeded" + StartupEventStartApiServerMonitorConfigured = "dcp.start_apiserver.monitor_configured" + StartupEventStartApiServerForkStarting = "dcp.start_apiserver.fork.starting" + StartupEventStartApiServerForkStarted = "dcp.start_apiserver.fork.started" + StartupEventStartApiServerRootDirResolving = "dcp.start_apiserver.root_dir.resolving" + StartupEventStartApiServerKubeconfigReady = "dcp.start_apiserver.kubeconfig_ready" + StartupEventRunApiServerStarting = "dcp.run.apiserver.starting" + StartupEventRunApiServerStarted = "dcp.apiserver.started" + StartupEventRunHostedServicesStarting = "dcp.run.hosted_services.starting" + StartupEventRunHostedServicesStarted = "dcp.hosted_services.started" + StartupEventHostedServiceError = "dcp.hosted_service.error" + StartupEventRunShutdownRequested = "dcp.run.shutdown_requested" + StartupEventRunApiServerStopped = "dcp.run.apiserver_stopped" + + StartupAttributeProcessPID = "process.pid" + StartupAttributeProcessExecutableName = "process.executable.name" + StartupAttributeCommandArgumentCount = "dcp.command.argument_count" + StartupAttributeCommandName = "dcp.command.name" + StartupAttributeCommandExitCode = "dcp.command.exit_code" + StartupAttributeDetach = "dcp.detach" + StartupAttributeServerOnly = "dcp.server_only" + StartupAttributeForkArgumentCount = "dcp.fork.argument_count" + StartupAttributeForkPID = "dcp.fork.pid" + StartupAttributeRootDirectoryBasename = "dcp.root_directory.basename" + StartupAttributeExtensionsCount = "dcp.extensions.count" + StartupAttributeExtensionsDirectoryCount = "dcp.extensions.directory_count" + StartupAttributeInvocationFlagsCount = "dcp.invocation_flags.count" + StartupAttributeControllerExtensionsCount = "dcp.controller_extensions.count" + StartupAttributeHostedServicesCount = "dcp.hosted_services.count" + StartupAttributeNotificationSourceCreated = "dcp.notification_source.created" + StartupAttributeResourceCleanup = "dcp.resource_cleanup" + StartupAttributeExtensionExecutableName = "dcp.extension.executable_name" + StartupAttributeExtensionID = "dcp.extension.id" + StartupAttributeExtensionName = "dcp.extension.name" + StartupAttributeExtensionCapabilityCount = "dcp.extension.capability_count" + StartupAttributeControllerKind = "dcp.controller.kind" + StartupAttributeResourceKind = "dcp.resource.kind" + StartupAttributeResourceName = "dcp.resource.name" + StartupAttributeResourceNamespace = "dcp.resource.namespace" + StartupAttributeResourceState = "dcp.resource.state" + StartupAttributeResourceFinalState = "dcp.resource.final_state" + StartupAttributeExecutableTargetState = "dcp.executable.target_state" + StartupAttributeExecutableRunState = "dcp.executable.run_state" + StartupAttributeContainerTargetState = "dcp.container.target_state" + StartupAttributeContainerRunState = "dcp.container.run_state" + StartupAttributeServiceFinalState = "dcp.service.final_state" + StartupAttributeServiceEffectivePort = "dcp.service.effective_port" + StartupAttributeServicePreviousState = "dcp.service.previous_state" + StartupAttributeServiceAddressAllocation = "dcp.service.address_allocation_mode" + StartupAttributeAspireResourceName = "aspire.resource.name" + StartupAttributeAspireLinkType = "aspire.link.type" + + StartupOperationIDEnvVar = "ASPIRE_STARTUP_OPERATION_ID" + StartupTraceParentEnvVar = "ASPIRE_STARTUP_TRACEPARENT" + StartupTraceStateEnvVar = "ASPIRE_STARTUP_TRACESTATE" + + StartupOperationIDAttribute = "aspire.startup.operation_id" + + StartupOperationIDAnnotation = "aspire-startup-operation-id" + StartupTraceParentAnnotation = "aspire-startup-traceparent" + StartupTraceStateAnnotation = "aspire-startup-tracestate" + + HostingDcpCreateObjectIDAttribute = "aspire.hosting.dcp.create_object.id" + HostingDcpCreateObjectKindAttribute = "aspire.hosting.dcp.create_object.kind" + HostingDcpCreateObjectNameAttribute = "aspire.hosting.dcp.create_object.name" + HostingDcpCreateObjectTraceIDAttribute = "aspire.hosting.dcp.create_object.trace_id" + HostingDcpCreateObjectSpanIDAttribute = "aspire.hosting.dcp.create_object.span_id" + + ResourceNameAnnotation = "resource-name" +) + +type StartupSpan struct { + span trace.Span +} + +func (startupSpan StartupSpan) End() { + startupSpan.span.End() +} + +func (startupSpan StartupSpan) SetError(err error) { + SetError(startupSpan.span, err) +} + +func (startupSpan StartupSpan) EndWithError(err error) { + startupSpan.SetError(err) + startupSpan.End() +} + +type StartupContext struct { + OperationID string + TraceParent string + TraceState string +} + +func StartupContextFromEnvironment() (StartupContext, bool) { + operationID := os.Getenv(StartupOperationIDEnvVar) + if operationID == "" { + return StartupContext{}, false + } + + return StartupContext{ + OperationID: operationID, + TraceParent: os.Getenv(StartupTraceParentEnvVar), + TraceState: os.Getenv(StartupTraceStateEnvVar), + }, true +} + +func StartupContextFromAnnotations(annotations map[string]string) (StartupContext, bool) { + if len(annotations) == 0 { + return StartupContext{}, false + } + + startupContext := StartupContext{ + OperationID: annotations[StartupOperationIDAnnotation], + TraceParent: annotations[StartupTraceParentAnnotation], + TraceState: annotations[StartupTraceStateAnnotation], + } + + return startupContext, startupContext.OperationID != "" || startupContext.TraceParent != "" +} + +func (startupContext StartupContext) Attributes() []attribute.KeyValue { + if startupContext.OperationID == "" { + return nil + } + + return []attribute.KeyValue{ + attribute.String(StartupOperationIDAttribute, startupContext.OperationID), + } +} + +func (startupContext StartupContext) RemoteParentContext(parentCtx context.Context) context.Context { + if startupContext.TraceParent == "" { + return parentCtx + } + + return propagation.TraceContext{}.Extract(parentCtx, propagation.MapCarrier{ + "traceparent": startupContext.TraceParent, + "tracestate": startupContext.TraceState, + }) +} + +func (startupContext StartupContext) RemoteSpanContext() trace.SpanContext { + if startupContext.TraceParent == "" { + return trace.SpanContext{} + } + + return trace.SpanContextFromContext(startupContext.RemoteParentContext(context.Background())) +} + +func SetStartupAttributes(ctx context.Context, startupContext StartupContext) { + span := trace.SpanFromContext(ctx) + span.SetAttributes(startupContext.Attributes()...) +} + +func StartStartupSpan(parentCtx context.Context, spanName string, attributes ...attribute.KeyValue) (context.Context, StartupSpan) { + startupContext, ok := StartupContextFromEnvironment() + if ok { + if !trace.SpanContextFromContext(parentCtx).IsValid() { + parentCtx = startupContext.RemoteParentContext(parentCtx) + } + + attributes = append(startupContext.Attributes(), attributes...) + } + + ctx, span := GetTracer(StartupTracerName).Start(parentCtx, spanName, trace.WithAttributes(attributes...)) + return ctx, StartupSpan{span: span} +} + +func SetDcpResourceAttributes(ctx context.Context, kind string, namespace string, name string, annotations map[string]string) { + SetAttribute(ctx, StartupAttributeResourceKind, kind) + SetAttribute(ctx, StartupAttributeResourceName, name) + + if namespace != "" { + SetAttribute(ctx, StartupAttributeResourceNamespace, namespace) + } + + if annotations != nil { + if resourceName := annotations[ResourceNameAnnotation]; resourceName != "" { + SetAttribute(ctx, StartupAttributeAspireResourceName, resourceName) + } + + SetAspireStartupAnnotationAttributes(ctx, annotations) + if _, ok := StartupContextFromAnnotations(annotations); ok { + SetAttribute(ctx, HostingDcpCreateObjectIDAttribute, dcpCreateObjectID(kind, namespace, name)) + SetAttribute(ctx, HostingDcpCreateObjectKindAttribute, kind) + SetAttribute(ctx, HostingDcpCreateObjectNameAttribute, name) + } + } +} + +func SetAspireStartupAnnotationAttributes(ctx context.Context, annotations map[string]string) { + startupContext, ok := StartupContextFromAnnotations(annotations) + if !ok { + return + } + + span := trace.SpanFromContext(ctx) + span.SetAttributes(startupContext.Attributes()...) + + if spanContext := startupContext.RemoteSpanContext(); spanContext.IsValid() { + span.SetAttributes( + attribute.String(HostingDcpCreateObjectTraceIDAttribute, spanContext.TraceID().String()), + attribute.String(HostingDcpCreateObjectSpanIDAttribute, spanContext.SpanID().String()), + ) + span.AddLink(trace.Link{ + SpanContext: spanContext, + Attributes: []attribute.KeyValue{ + attribute.String(StartupAttributeAspireLinkType, "hosting.dcp.create_object"), + }, + }) + } +} + +func dcpCreateObjectID(kind string, namespace string, name string) string { + if namespace == "" { + return kind + "/" + name + } + + return kind + "/" + namespace + "/" + name +} diff --git a/internal/telemetry/startup_context_test.go b/internal/telemetry/startup_context_test.go new file mode 100644 index 00000000..557a6033 --- /dev/null +++ b/internal/telemetry/startup_context_test.go @@ -0,0 +1,65 @@ +/*--------------------------------------------------------------------------------------------- + * Copyright (c) Microsoft Corporation. All rights reserved. + * Licensed under the MIT License. See LICENSE in the project root for license information. + *--------------------------------------------------------------------------------------------*/ + +package telemetry + +import ( + "context" + "testing" + + "go.opentelemetry.io/otel/trace" +) + +func TestStartupContextFromEnvironmentReturnsFalseWhenOperationIDMissing(t *testing.T) { + t.Setenv(StartupOperationIDEnvVar, "") + t.Setenv(StartupTraceParentEnvVar, "00-0102030405060708090a0b0c0d0e0f10-1112131415161718-01") + + _, ok := StartupContextFromEnvironment() + if ok { + t.Fatal("expected startup context to be missing") + } +} + +func TestStartupContextFromEnvironmentReadsCorrelationValues(t *testing.T) { + t.Setenv(StartupOperationIDEnvVar, "operation-1") + t.Setenv(StartupTraceParentEnvVar, "00-0102030405060708090a0b0c0d0e0f10-1112131415161718-01") + t.Setenv(StartupTraceStateEnvVar, "state-1") + + startupContext, ok := StartupContextFromEnvironment() + if !ok { + t.Fatal("expected startup context") + } + + if startupContext.OperationID != "operation-1" { + t.Fatalf("unexpected operation id: %q", startupContext.OperationID) + } + if startupContext.TraceParent != "00-0102030405060708090a0b0c0d0e0f10-1112131415161718-01" { + t.Fatalf("unexpected traceparent: %q", startupContext.TraceParent) + } + if startupContext.TraceState != "state-1" { + t.Fatalf("unexpected tracestate: %q", startupContext.TraceState) + } +} + +func TestRemoteParentContextExtractsTraceContext(t *testing.T) { + startupContext := StartupContext{ + OperationID: "operation-1", + TraceParent: "00-0102030405060708090a0b0c0d0e0f10-1112131415161718-01", + TraceState: "state-1", + } + + ctx := startupContext.RemoteParentContext(context.Background()) + spanContext := trace.SpanContextFromContext(ctx) + + if !spanContext.IsRemote() { + t.Fatal("expected remote span context") + } + if spanContext.TraceID().String() != "0102030405060708090a0b0c0d0e0f10" { + t.Fatalf("unexpected trace id: %q", spanContext.TraceID().String()) + } + if spanContext.SpanID().String() != "1112131415161718" { + t.Fatalf("unexpected span id: %q", spanContext.SpanID().String()) + } +} diff --git a/internal/telemetry/telemetry.go b/internal/telemetry/telemetry.go index 2f5af71f..2b8c655d 100644 --- a/internal/telemetry/telemetry.go +++ b/internal/telemetry/telemetry.go @@ -3,184 +3,214 @@ * Licensed under the MIT License. See LICENSE in the project root for license information. *--------------------------------------------------------------------------------------------*/ - -package telemetry - -import ( - "context" - "crypto/sha256" - "errors" - "fmt" - "os" - "path/filepath" - "sync" - "time" - - "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/codes" - "go.opentelemetry.io/otel/metric" - sdkmetric "go.opentelemetry.io/otel/sdk/metric" - "go.opentelemetry.io/otel/sdk/resource" - sdktrace "go.opentelemetry.io/otel/sdk/trace" - "go.opentelemetry.io/otel/trace" - - "github.com/microsoft/dcp/pkg/osutil" -) - -type TelemetrySystem struct { - TracerProvider *sdktrace.TracerProvider - MeterProvider *sdkmetric.MeterProvider - spanExporter sdktrace.SpanExporter - metricExporter sdkmetric.Exporter -} - -var instance *TelemetrySystem -var once sync.Once - -func GetTelemetrySystem() *TelemetrySystem { - once.Do(func() { - logName := filepath.Base(os.Args[0]) - if osutil.IsWindows() { - logName = logName[:len(logName)-len(filepath.Ext(logName))] - } - spanExp, err := newTraceExporter(logName) - if err != nil { - // TODO: report error - spanExp = discardExporter{} - } - - tp := sdktrace.NewTracerProvider( - sdktrace.WithSpanProcessor(NewSuppressIfSuccessfulSpanProcessor(sdktrace.NewBatchSpanProcessor(spanExp))), - sdktrace.WithResource(resource.Default()), - ) - - metricExp, err := newMetricExporter() - if err != nil { - // TODO: report error - metricExp = discardExporter{} - } - - mp := sdkmetric.NewMeterProvider( - sdkmetric.WithReader( - sdkmetric.NewPeriodicReader(metricExp, sdkmetric.WithInterval(1*time.Minute)), - ), - ) - - otel.SetTracerProvider(tp) - // otel.SetMeterProvider(mp) // TODO: Not supported in otel 1.10.0 - - instance = &TelemetrySystem{ - TracerProvider: tp, - MeterProvider: mp, - spanExporter: spanExp, - metricExporter: metricExp, - } - }) - - return instance -} - -func GetTracer(tracerName string) trace.Tracer { - ts := GetTelemetrySystem() - return ts.TracerProvider.Tracer(tracerName) -} - -func GetMeter(meterName string) metric.Meter { - ts := GetTelemetrySystem() - return ts.MeterProvider.Meter(meterName) -} - -func (ts TelemetrySystem) Shutdown(ctx context.Context) error { - return errors.Join( - ts.TracerProvider.Shutdown(ctx), - ts.MeterProvider.Shutdown(ctx), - ts.spanExporter.Shutdown(ctx), - ts.metricExporter.Shutdown(ctx), - ) -} - -func CallWithTelemetryOnErrorOnly[TResult any](tracer trace.Tracer, spanName string, parentCtx context.Context, fn func(ctx context.Context) (TResult, error)) (TResult, error) { - spanCtx, span := tracer.Start(parentCtx, spanName, trace.WithAttributes(attribute.Bool(suppressIfSuccessful, true))) - defer span.End() - - result, err := fn(spanCtx) - if err != nil { - span.RecordError(err) - span.SetStatus(codes.Error, err.Error()) - } - return result, err -} - -func CallWithTelemetry[TResult any](tracer trace.Tracer, spanName string, parentCtx context.Context, fn func(ctx context.Context) (TResult, error)) (TResult, error) { - spanCtx, span := tracer.Start(parentCtx, spanName) - defer span.End() - - result, err := fn(spanCtx) - if err != nil { - span.RecordError(err) - span.SetStatus(codes.Error, err.Error()) - } - return result, err -} - -func CallWithTelemetryNoResult(tracer trace.Tracer, spanName string, parentCtx context.Context, fn func(ctx context.Context) error) error { - spanCtx, span := tracer.Start(parentCtx, spanName) - defer span.End() - - err := fn(spanCtx) - if err != nil { - span.RecordError(err) - span.SetStatus(codes.Error, err.Error()) - } - return err -} - -type TelemetryAttribute interface { - int | int64 | bool | float64 | string | []int | []int64 | []bool | []float64 | []string -} - -func SetAttribute[T TelemetryAttribute](ctx context.Context, key string, value T) { - span := trace.SpanFromContext(ctx) - - switch v := (any)(value).(type) { - case int: - span.SetAttributes(attribute.Int(key, v)) - case int64: - span.SetAttributes(attribute.Int64(key, v)) - case bool: - span.SetAttributes(attribute.Bool(key, v)) - case float64: - span.SetAttributes(attribute.Float64(key, v)) - case string: - span.SetAttributes(attribute.String(key, v)) - case []int: - span.SetAttributes(attribute.IntSlice(key, v)) - case []int64: - span.SetAttributes(attribute.Int64Slice(key, v)) - case []bool: - span.SetAttributes(attribute.BoolSlice(key, v)) - case []float64: - span.SetAttributes(attribute.Float64Slice(key, v)) - case []string: - span.SetAttributes(attribute.StringSlice(key, v)) - default: - // This should never happen - fmt.Printf("unknown telemetry type for key %s", key) - } -} - -func AddEvent(ctx context.Context, name string, options ...trace.EventOption) { - span := trace.SpanFromContext(ctx) - span.AddEvent(name, options...) -} - -var hash = sha256.New() - -func HashValue(value string) string { - hash.Reset() - - hash.Write([]byte(value)) - hashBytes := hash.Sum(nil) - return fmt.Sprintf("%x", hashBytes) -} +package telemetry + +import ( + "context" + "crypto/sha256" + "errors" + "fmt" + "os" + "path/filepath" + "sync" + "time" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/metric" + sdkmetric "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/resource" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/trace" + + "github.com/microsoft/dcp/internal/version" + "github.com/microsoft/dcp/pkg/osutil" +) + +type TelemetrySystem struct { + TracerProvider *sdktrace.TracerProvider + MeterProvider *sdkmetric.MeterProvider + spanExporter sdktrace.SpanExporter + metricExporter sdkmetric.Exporter +} + +var instance *TelemetrySystem +var once sync.Once + +func GetTelemetrySystem() *TelemetrySystem { + once.Do(func() { + logName := filepath.Base(os.Args[0]) + if osutil.IsWindows() { + logName = logName[:len(logName)-len(filepath.Ext(logName))] + } + spanExp, err := newTraceExporter(logName) + if err != nil { + // TODO: report error + spanExp = discardExporter{} + } + + tp := sdktrace.NewTracerProvider( + sdktrace.WithSpanProcessor(NewSuppressIfSuccessfulSpanProcessor(sdktrace.NewBatchSpanProcessor(spanExp))), + sdktrace.WithResource(newTelemetryResource(logName)), + ) + + metricExp, err := newMetricExporter() + if err != nil { + // TODO: report error + metricExp = discardExporter{} + } + + mp := sdkmetric.NewMeterProvider( + sdkmetric.WithReader( + sdkmetric.NewPeriodicReader(metricExp, sdkmetric.WithInterval(1*time.Minute)), + ), + ) + + otel.SetTracerProvider(tp) + // otel.SetMeterProvider(mp) // TODO: Not supported in otel 1.10.0 + + instance = &TelemetrySystem{ + TracerProvider: tp, + MeterProvider: mp, + spanExporter: spanExp, + metricExporter: metricExp, + } + }) + + return instance +} + +func newTelemetryResource(logName string) *resource.Resource { + versionInfo := version.Version() + attributes := []attribute.KeyValue{ + attribute.String("service.name", logName), + attribute.String("service.instance.id", fmt.Sprintf("%d", os.Getpid())), + attribute.Int("process.pid", os.Getpid()), + attribute.String("process.executable.name", logName), + } + if versionInfo.Version != "" { + attributes = append(attributes, attribute.String("service.version", versionInfo.Version)) + } + if versionInfo.CommitHash != "" { + attributes = append(attributes, attribute.String("service.version.revision", versionInfo.CommitHash)) + } + + return resource.NewWithAttributes("", attributes...) +} + +func GetTracer(tracerName string) trace.Tracer { + ts := GetTelemetrySystem() + return ts.TracerProvider.Tracer(tracerName) +} + +func GetMeter(meterName string) metric.Meter { + ts := GetTelemetrySystem() + return ts.MeterProvider.Meter(meterName) +} + +func (ts TelemetrySystem) Shutdown(ctx context.Context) error { + return errors.Join( + ts.TracerProvider.Shutdown(ctx), + ts.MeterProvider.Shutdown(ctx), + ts.spanExporter.Shutdown(ctx), + ts.metricExporter.Shutdown(ctx), + ) +} + +func CallWithTelemetryOnErrorOnly[TResult any](tracer trace.Tracer, spanName string, parentCtx context.Context, fn func(ctx context.Context) (TResult, error)) (TResult, error) { + spanCtx, span := tracer.Start(parentCtx, spanName, trace.WithAttributes(attribute.Bool(suppressIfSuccessful, true))) + defer span.End() + + result, err := fn(spanCtx) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + } + return result, err +} + +func CallWithTelemetry[TResult any](tracer trace.Tracer, spanName string, parentCtx context.Context, fn func(ctx context.Context) (TResult, error)) (TResult, error) { + spanCtx, span := tracer.Start(parentCtx, spanName) + defer span.End() + + result, err := fn(spanCtx) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + } + return result, err +} + +func CallWithTelemetryNoResult(tracer trace.Tracer, spanName string, parentCtx context.Context, fn func(ctx context.Context) error) error { + spanCtx, span := tracer.Start(parentCtx, spanName) + defer span.End() + + err := fn(spanCtx) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + } + return err +} + +type TelemetryAttribute interface { + int | int64 | bool | float64 | string | []int | []int64 | []bool | []float64 | []string +} + +func SetAttribute[T TelemetryAttribute](ctx context.Context, key string, value T) { + span := trace.SpanFromContext(ctx) + + switch v := (any)(value).(type) { + case int: + span.SetAttributes(attribute.Int(key, v)) + case int64: + span.SetAttributes(attribute.Int64(key, v)) + case bool: + span.SetAttributes(attribute.Bool(key, v)) + case float64: + span.SetAttributes(attribute.Float64(key, v)) + case string: + span.SetAttributes(attribute.String(key, v)) + case []int: + span.SetAttributes(attribute.IntSlice(key, v)) + case []int64: + span.SetAttributes(attribute.Int64Slice(key, v)) + case []bool: + span.SetAttributes(attribute.BoolSlice(key, v)) + case []float64: + span.SetAttributes(attribute.Float64Slice(key, v)) + case []string: + span.SetAttributes(attribute.StringSlice(key, v)) + default: + // This should never happen + fmt.Printf("unknown telemetry type for key %s", key) + } +} + +func AddEvent(ctx context.Context, name string, attributes ...attribute.KeyValue) { + span := trace.SpanFromContext(ctx) + if len(attributes) == 0 { + span.AddEvent(name) + return + } + + span.AddEvent(name, trace.WithAttributes(attributes...)) +} + +func SetError(span trace.Span, err error) { + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + } +} + +var hash = sha256.New() + +func HashValue(value string) string { + hash.Reset() + + hash.Write([]byte(value)) + hashBytes := hash.Sum(nil) + return fmt.Sprintf("%x", hashBytes) +} diff --git a/internal/telemetry/telemetry_test.go b/internal/telemetry/telemetry_test.go new file mode 100644 index 00000000..3cccdc34 --- /dev/null +++ b/internal/telemetry/telemetry_test.go @@ -0,0 +1,72 @@ +/*--------------------------------------------------------------------------------------------- + * Copyright (c) Microsoft Corporation. All rights reserved. + * Licensed under the MIT License. See LICENSE in the project root for license information. + *--------------------------------------------------------------------------------------------*/ + +package telemetry + +import ( + "context" + "testing" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" +) + +func TestTelemetryResourceSetsServiceName(t *testing.T) { + resource := newTelemetryResource("dcp") + + value, found := resource.Set().Value(attribute.Key("service.name")) + if !found { + t.Fatal("expected service.name resource attribute") + } + + if value.AsString() != "dcp" { + t.Fatalf("unexpected service.name: %q", value.AsString()) + } +} + +func TestStartupContextFromAnnotationsExtractsRemoteSpanContext(t *testing.T) { + startupContext, ok := StartupContextFromAnnotations(map[string]string{ + StartupOperationIDAnnotation: "operation-id", + StartupTraceParentAnnotation: "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01", + StartupTraceStateAnnotation: "vendor=value", + }) + if !ok { + t.Fatal("expected startup context") + } + + if startupContext.OperationID != "operation-id" { + t.Fatalf("unexpected operation id: %q", startupContext.OperationID) + } + + spanContext := startupContext.RemoteSpanContext() + if !spanContext.IsValid() { + t.Fatal("expected valid remote span context") + } + + if spanContext.TraceID().String() != "4bf92f3577b34da6a3ce929d0e0e4736" { + t.Fatalf("unexpected trace id: %q", spanContext.TraceID().String()) + } + + if spanContext.SpanID().String() != "00f067aa0ba902b7" { + t.Fatalf("unexpected span id: %q", spanContext.SpanID().String()) + } +} + +func TestStartupContextRemoteSpanContextDoesNotUseCurrentSpan(t *testing.T) { + currentSpanContext := trace.NewSpanContext(trace.SpanContextConfig{ + TraceID: trace.TraceID{1}, + SpanID: trace.SpanID{1}, + }) + parentCtx := trace.ContextWithSpanContext(context.Background(), currentSpanContext) + + startupContext := StartupContext{ + TraceParent: "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01", + } + extractedCtx := startupContext.RemoteParentContext(parentCtx) + + if got := trace.SpanContextFromContext(extractedCtx).TraceID().String(); got != "4bf92f3577b34da6a3ce929d0e0e4736" { + t.Fatalf("unexpected trace id: %q", got) + } +}