Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 68 additions & 1 deletion cmd/dcp/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,17 @@ package main
//go:generate goversioninfo

import (
"context"
"os"
"path/filepath"
"sync"
"time"

kubeapiserver "k8s.io/apiserver/pkg/server"

cmdutil "github.com/microsoft/dcp/internal/commands"
"github.com/microsoft/dcp/internal/dcp/commands"
"github.com/microsoft/dcp/internal/telemetry"
"github.com/microsoft/dcp/pkg/logger"
"github.com/microsoft/dcp/pkg/osutil"
"github.com/microsoft/dcp/pkg/resiliency"
Expand Down Expand Up @@ -46,15 +51,77 @@ func main() {

ctx := kubeapiserver.SetupSignalContext()

telemetrySystem := telemetry.GetTelemetrySystem()
var shutdownTelemetryOnce sync.Once
shutdownTelemetry := func() {
shutdownTelemetryOnce.Do(func() {
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer shutdownCancel()
_ = telemetrySystem.Shutdown(shutdownCtx)
})
}
defer shutdownTelemetry()

ctx, commandSpan := telemetry.StartStartupSpan(ctx, telemetry.StartupSpanCommand)
telemetry.SetAttribute(ctx, telemetry.StartupAttributeProcessPID, os.Getpid())
telemetry.SetAttribute(ctx, telemetry.StartupAttributeProcessExecutableName, filepath.Base(os.Args[0]))
if len(os.Args) > 1 {
telemetry.SetAttribute(ctx, telemetry.StartupAttributeCommandArgumentCount, len(os.Args)-1)
}
if commandName := getCommandName(os.Args); commandName != "" {
telemetry.SetAttribute(ctx, telemetry.StartupAttributeCommandName, commandName)
}
spanEnded := false
endCommandSpan := func() {
if !spanEnded {
commandSpan.End()
spanEnded = true
}
}
defer endCommandSpan()

telemetry.AddEvent(ctx, telemetry.StartupEventCommandRootCommandCreating)
root, err := commands.NewRootCmd(log)
if err != nil {
telemetry.SetAttribute(ctx, telemetry.StartupAttributeCommandExitCode, errSetup)
commandSpan.SetError(err)
endCommandSpan()
shutdownTelemetry()
cmdutil.ErrorExit(log, err, errSetup)
}
telemetry.AddEvent(ctx, telemetry.StartupEventCommandRootCommandCreated)

telemetry.AddEvent(ctx, telemetry.StartupEventCommandExecuteStart)
endCommandSpan()

err = root.ExecuteContext(ctx)
lifetimeCtx, lifetimeSpan := telemetry.StartStartupSpan(ctx, telemetry.StartupSpanCommandLifetime)
if commandName := getCommandName(os.Args); commandName != "" {
telemetry.SetAttribute(lifetimeCtx, telemetry.StartupAttributeCommandName, commandName)
}
telemetry.SetAttribute(lifetimeCtx, telemetry.StartupAttributeCommandArgumentCount, max(len(os.Args)-1, 0))

err = root.ExecuteContext(lifetimeCtx)
if err != nil {
telemetry.SetAttribute(lifetimeCtx, telemetry.StartupAttributeCommandExitCode, errCommand)
telemetry.AddEvent(lifetimeCtx, telemetry.StartupEventCommandExecuteFailed)
lifetimeSpan.SetError(err)
lifetimeSpan.End()
shutdownTelemetry()
cmdutil.ErrorExit(log, err, errCommand)
} else {
telemetry.SetAttribute(lifetimeCtx, telemetry.StartupAttributeCommandExitCode, 0)
telemetry.AddEvent(lifetimeCtx, telemetry.StartupEventCommandExecuteSucceeded)
lifetimeSpan.End()
log.Flush()
}
}

func getCommandName(args []string) string {
for _, arg := range args[1:] {
if arg != "" && arg[0] != '-' {
return arg
}
}

return ""
}
25 changes: 21 additions & 4 deletions controllers/container_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/microsoft/dcp/internal/containers"
"github.com/microsoft/dcp/internal/health"
"github.com/microsoft/dcp/internal/networking"
"github.com/microsoft/dcp/internal/telemetry"
"github.com/microsoft/dcp/internal/templating"
"github.com/microsoft/dcp/internal/version"
"github.com/microsoft/dcp/pkg/commonapi"
Expand Down Expand Up @@ -204,7 +205,13 @@ func (r *ContainerReconciler) SetupWithManager(mgr ctrl.Manager, name string) er
Complete(r)
}

func (r *ContainerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
func (r *ContainerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (result ctrl.Result, err error) {
ctx, span := r.StartReconciliationSpan(ctx, req)
defer func() {
span.SetError(err)
span.End()
}()

reader, log := r.StartReconciliation(req)

if ctx.Err() != nil {
Expand All @@ -213,7 +220,7 @@ func (r *ContainerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
}

container := apiv1.Container{}
err := reader.Get(ctx, req.NamespacedName, &container)
err = reader.Get(ctx, req.NamespacedName, &container)

if err != nil {
if apierrors.IsNotFound(err) {
Expand All @@ -229,6 +236,9 @@ func (r *ContainerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
getSucceededCounter.Add(ctx, 1)
}

r.SetObjectAttributes(ctx, &container)
telemetry.SetAttribute(ctx, telemetry.StartupAttributeResourceState, string(container.Status.State))

log = log.WithValues(logger.RESOURCE_LOG_STREAM_ID, container.GetResourceId())

if container.Status.ContainerID != "" {
Expand Down Expand Up @@ -260,17 +270,24 @@ func (r *ContainerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
additionalReconcileDelay = LongDelay
}

result, saveErr := r.SaveChangesWithDelay(ctx, &container, patch, change, additionalReconcileDelay, nil, log)
return result, saveErr
result, err = r.SaveChangesWithDelay(ctx, &container, patch, change, additionalReconcileDelay, nil, log)
telemetry.SetAttribute(ctx, telemetry.StartupAttributeResourceFinalState, string(container.Status.State))
return result, err
}

func (r *ContainerReconciler) manageContainer(ctx context.Context, container *apiv1.Container, log logr.Logger) objectChange {
ctx, span := telemetry.StartStartupSpan(ctx, telemetry.StartupSpanContainerManage)
r.SetObjectAttributes(ctx, container)
defer span.End()

targetContainerState := container.Status.State
telemetry.SetAttribute(ctx, telemetry.StartupAttributeContainerTargetState, string(targetContainerState))
_, rcd := r.runningContainers.BorrowByNamespacedName(container.NamespacedName())
if rcd != nil {
// In-memory container state is not subject to issues related to caching and
// status updates failed due to conflict, so it is fresher and has precedence.
targetContainerState = rcd.containerState
telemetry.SetAttribute(ctx, telemetry.StartupAttributeContainerRunState, string(rcd.containerState))
}

// Even if the new container state is (as it is usually the case) the same as the target state,
Expand Down
21 changes: 19 additions & 2 deletions controllers/executable_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
"github.com/microsoft/dcp/internal/health"
"github.com/microsoft/dcp/internal/logs"
"github.com/microsoft/dcp/internal/networking"
"github.com/microsoft/dcp/internal/telemetry"
"github.com/microsoft/dcp/internal/templating"
"github.com/microsoft/dcp/pkg/commonapi"
"github.com/microsoft/dcp/pkg/concurrency"
Expand Down Expand Up @@ -142,7 +143,13 @@
Status will be updated based on the status of the corresponding run and the run will be terminated if
the Executable is deleted.
*/
func (r *ExecutableReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
func (r *ExecutableReconciler) Reconcile(ctx context.Context, req ctrl.Request) (result ctrl.Result, err error) {
ctx, span := r.StartReconciliationSpan(ctx, req)
defer func() {
span.SetError(err)
span.End()
}()

reader, log := r.StartReconciliation(req)

// Check to see if the request context has already expired
Expand All @@ -153,7 +160,7 @@

// Retrieve the Executable object
exe := apiv1.Executable{}
if err := reader.Get(ctx, req.NamespacedName, &exe); err != nil {

Check failure on line 163 in controllers/executable_controller.go

View workflow job for this annotation

GitHub Actions / Lint ubuntu-latest

shadow: declaration of "err" shadows declaration at line 146 (govet)
if apierrors.IsNotFound(err) {
// Ensure the cache of Executable to run ID is cleared
r.runs.DeleteByNamespacedName(req.NamespacedName)
Expand All @@ -169,6 +176,9 @@
getSucceededCounter.Add(ctx, 1)
}

r.SetObjectAttributes(ctx, &exe)
telemetry.SetAttribute(ctx, telemetry.StartupAttributeResourceState, string(exe.Status.State))

// Route logs to the resource sink
log = log.WithValues(logger.RESOURCE_LOG_STREAM_ID, exe.GetResourceId())

Expand All @@ -185,7 +195,8 @@
change = r.manageExecutable(ctx, &exe, log)
}

result, err := r.SaveChanges(ctx, &exe, patch, change, nil, log)
result, err = r.SaveChanges(ctx, &exe, patch, change, nil, log)
telemetry.SetAttribute(ctx, telemetry.StartupAttributeResourceFinalState, string(exe.Status.State))
if exe.Done() {
log.V(1).Info("Executable reached done state")
}
Expand Down Expand Up @@ -226,12 +237,18 @@
}

func (r *ExecutableReconciler) manageExecutable(ctx context.Context, exe *apiv1.Executable, log logr.Logger) objectChange {
ctx, span := telemetry.StartStartupSpan(ctx, telemetry.StartupSpanExecutableManage)
r.SetObjectAttributes(ctx, exe)
defer span.End()

targetExecutableState := exe.Status.State
telemetry.SetAttribute(ctx, telemetry.StartupAttributeExecutableTargetState, string(targetExecutableState))
_, runInfo := r.runs.BorrowByNamespacedName(exe.NamespacedName())
if runInfo != nil {
// In-memory run info is not subject to issues related to caching
// and failure to update Executable Status due to conflict, so it takes precedence.
targetExecutableState = runInfo.ExeState
telemetry.SetAttribute(ctx, telemetry.StartupAttributeExecutableRunState, string(runInfo.ExeState))
}

// Even if Executable.State == targetExecutableState, we still want to run the initializer
Expand Down
17 changes: 17 additions & 0 deletions controllers/reconciler_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/cenkalti/backoff/v4"
"github.com/go-logr/logr"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"

k8serrors "k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -101,6 +102,22 @@ func (rb *ReconcilerBase[T, PT]) StartReconciliation(req ctrl.Request) (ctrl_cli
return reader, log
}

func (rb *ReconcilerBase[T, PT]) StartReconciliationSpan(ctx context.Context, req ctrl.Request) (context.Context, telemetry.StartupSpan) {
return telemetry.StartStartupSpan(
ctx,
telemetry.StartupSpanControllerReconcile,
attribute.String(telemetry.StartupAttributeControllerKind, rb.kind),
attribute.String(telemetry.StartupAttributeResourceKind, rb.kind),
attribute.String(telemetry.StartupAttributeResourceName, req.Name),
attribute.String(telemetry.StartupAttributeResourceNamespace, req.Namespace),
)
}

func (rb *ReconcilerBase[T, PT]) SetObjectAttributes(ctx context.Context, obj PT) {
objectMeta := obj.GetObjectMeta()
telemetry.SetDcpResourceAttributes(ctx, rb.kind, objectMeta.GetNamespace(), objectMeta.GetName(), objectMeta.GetAnnotations())
}

// Schedules reconciliation for specific object identified by namespaced name.
func (rb *ReconcilerBase[T, PT]) ScheduleReconciliation(nn types.NamespacedName) {
rb.debouncer.ReconciliationNeeded(rb.LifetimeCtx, nn)
Expand Down
35 changes: 27 additions & 8 deletions controllers/service_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
"net"
"os"
"path/filepath"
"sync/atomic"

"github.com/go-logr/logr"
"go.opentelemetry.io/otel/trace"
Expand Down Expand Up @@ -151,19 +150,22 @@
}
}

func (r *ServiceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log := r.Log.WithValues(
"Service", req.NamespacedName.String(),
"Reconciliation", atomic.AddUint32(&r.reconciliationSeqNo, 1),
)
func (r *ServiceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (result ctrl.Result, err error) {
ctx, span := r.StartReconciliationSpan(ctx, req)
defer func() {
span.SetError(err)
span.End()
}()

reader, log := r.StartReconciliation(req)

if ctx.Err() != nil {
log.V(1).Info("Request context expired, nothing to do...")
return ctrl.Result{}, nil
}

svc := apiv1.Service{}
if err := r.Get(ctx, req.NamespacedName, &svc); err != nil {
if err := reader.Get(ctx, req.NamespacedName, &svc); err != nil {

Check failure on line 168 in controllers/service_controller.go

View workflow job for this annotation

GitHub Actions / Lint ubuntu-latest

shadow: declaration of "err" shadows declaration at line 153 (govet)
if apimachinery_errors.IsNotFound(err) {
log.V(1).Info("The Service object does not exist yet or was deleted")
r.stopService(req.NamespacedName, nil, log)
Expand All @@ -178,6 +180,9 @@
getSucceededCounter.Add(ctx, 1)
}

r.SetObjectAttributes(ctx, &svc)
telemetry.SetAttribute(ctx, telemetry.StartupAttributeResourceState, string(svc.Status.State))

log = log.WithValues(logger.RESOURCE_LOG_STREAM_ID, svc.GetResourceId())

var change objectChange
Expand All @@ -198,7 +203,8 @@
}
}

result, err := r.SaveChangesWithDelay(ctx, &svc, patch, change, r.config.AdditionalReconciliationDelay, nil, log)
result, err = r.SaveChangesWithDelay(ctx, &svc, patch, change, r.config.AdditionalReconciliationDelay, nil, log)
telemetry.SetAttribute(ctx, telemetry.StartupAttributeResourceFinalState, string(svc.Status.State))
return result, err
}

Expand All @@ -214,7 +220,20 @@
}

func (r *ServiceReconciler) ensureServiceEffectiveAddressAndPort(ctx context.Context, svc *apiv1.Service, log logr.Logger) objectChange {
ctx, span := telemetry.StartStartupSpan(ctx, telemetry.StartupSpanServiceEnsureEffectiveAddress)
r.SetObjectAttributes(ctx, svc)
defer func() {
telemetry.SetAttribute(ctx, telemetry.StartupAttributeServiceFinalState, string(svc.Status.State))
if svc.Status.EffectivePort != 0 {
telemetry.SetAttribute(ctx, telemetry.StartupAttributeServiceEffectivePort, int(svc.Status.EffectivePort))
}
span.End()
}()

oldState := svc.Status.State
telemetry.SetAttribute(ctx, telemetry.StartupAttributeServicePreviousState, string(oldState))
telemetry.SetAttribute(ctx, telemetry.StartupAttributeServiceAddressAllocation, string(svc.Spec.AddressAllocationMode))

oldEffectiveAddress := svc.Status.EffectiveAddress
oldEffectivePort := svc.Status.EffectivePort
oldEndpointNamespacedName := types.NamespacedName{
Expand Down
Loading
Loading