Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 40 additions & 61 deletions pkg/collector/corechecks/sbom/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,72 +121,56 @@ func (p *processor) processContainerImagesEvents(evBundle workloadmeta.EventBund

log.Tracef("Processing %d events", len(evBundle.Events))

// Separate events by kind and type so we can process them in an order that
// keeps imageUsers accurate when SBOMs are computed.
var (
imageSetEvents []workloadmeta.Event
imageUnsetEvents []workloadmeta.Event
containerSetEvents []workloadmeta.Event
containerUnsetEvents []workloadmeta.Event
)
// Separate events into images and containers
var imageEvents []workloadmeta.Event
var containerEvents []workloadmeta.Event

for _, event := range evBundle.Events {
switch event.Entity.GetID().Kind {
entityID := event.Entity.GetID()
switch entityID.Kind {
case workloadmeta.KindContainerImageMetadata:
if event.Type == workloadmeta.EventTypeSet {
imageSetEvents = append(imageSetEvents, event)
} else {
imageUnsetEvents = append(imageUnsetEvents, event)
}
imageEvents = append(imageEvents, event)
case workloadmeta.KindContainer:
if event.Type == workloadmeta.EventTypeSet {
containerSetEvents = append(containerSetEvents, event)
} else {
containerUnsetEvents = append(containerUnsetEvents, event)
}
containerEvents = append(containerEvents, event)
}
}

// 1. Unregister removed containers first so imageUsers is up to date before
// we compute inUse below. Processing image Set events before these removals
// would cause false-positive inUse=true on the emitted SBOM.
for _, event := range containerUnsetEvents {
p.unregisterContainer(event.Entity.(*workloadmeta.Container))
}

// 2. Unregister removed images.
for _, event := range imageUnsetEvents {
p.unregisterImage(event.Entity.(*workloadmeta.ContainerImageMetadata))
// Let the SBOM expire on back-end side
}
// Process all image events first
for _, event := range imageEvents {
Comment on lines +138 to +139
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Process container unsets before image set events

Handling all image events before container events means processImageSBOM reads imageUsers before same-bundle EventTypeUnset container removals are applied, so the emitted SBOM can report InUse=true for an image whose last running container was removed in that bundle. This is a data-accuracy regression whenever workloadmeta coalesces image refresh and container teardown together.

Useful? React with 👍 / 👎.

switch event.Type {
case workloadmeta.EventTypeSet:
filterableContainerImage := workloadfilter.CreateContainerImage(event.Entity.(*workloadmeta.ContainerImageMetadata).Name)
if p.containerFilter.IsExcluded(filterableContainerImage) {
continue
}

// 3. Register updated images and emit SBOMs; inUse now reflects the
// current set of running containers.
for _, event := range imageSetEvents {
filterableContainerImage := workloadfilter.CreateContainerImage(event.Entity.(*workloadmeta.ContainerImageMetadata).Name)
if p.containerFilter.IsExcluded(filterableContainerImage) {
continue
p.registerImage(event.Entity.(*workloadmeta.ContainerImageMetadata))
p.processImageSBOM(event.Entity.(*workloadmeta.ContainerImageMetadata))
case workloadmeta.EventTypeUnset:
p.unregisterImage(event.Entity.(*workloadmeta.ContainerImageMetadata))
// Let the SBOM expire on back-end side
}

p.registerImage(event.Entity.(*workloadmeta.ContainerImageMetadata))
p.processImageSBOM(event.Entity.(*workloadmeta.ContainerImageMetadata))
}

// 4. Register new/updated containers. registerContainer may emit a second
// SBOM for an image that just gained its first running container.
for _, event := range containerSetEvents {
container := event.Entity.(*workloadmeta.Container)
p.registerContainer(container)
// Process all container events after images
for _, event := range containerEvents {
switch event.Type {
case workloadmeta.EventTypeSet:
container := event.Entity.(*workloadmeta.Container)
p.registerContainer(container)

filterableContainer := workloadmetafilter.CreateContainer(container, nil)
if p.containerFilter.IsExcluded(filterableContainer) {
continue
}
filterableContainer := workloadmetafilter.CreateContainer(container, nil)
if p.containerFilter.IsExcluded(filterableContainer) {
continue
}

if p.procfsSBOM {
if ok, err := procfs.IsAgentContainer(container.ID); !ok && err == nil {
p.triggerProcfsScan(container)
if p.procfsSBOM {
if ok, err := procfs.IsAgentContainer(container.ID); !ok && err == nil {
p.triggerProcfsScan(container)
}
}
case workloadmeta.EventTypeUnset:
p.unregisterContainer(event.Entity.(*workloadmeta.Container))
}
}
}
Expand All @@ -211,10 +195,6 @@ func (p *processor) registerContainer(ctr *workloadmeta.Container) {
ctrID := ctr.ID

if !ctr.State.Running {
// Container is no longer running. Remove it from imageUsers so that a
// subsequent SBOM computation does not incorrectly set inUse=true for
// an image that has no running containers.
p.unregisterContainer(ctr)
return
}
Comment on lines 197 to 199
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Unregister non-running containers from image usage

Returning immediately when ctr.State.Running is false leaves any prior imageUsers[imgID][ctrID] entry intact, so a container that transitions from running to stopped via a Set event keeps the image marked as in use until/unless an Unset arrives. That causes stale InUse=true values on subsequent image SBOM emissions for runtimes/reporting paths that send stopped updates before removal.

Useful? React with 👍 / 👎.


Expand Down Expand Up @@ -401,11 +381,6 @@ func (p *processor) processImageSBOM(img *workloadmeta.ContainerImageMetadata) {
break
}
}
// Fallback for runtimes (e.g. containerd) where ctr.Image.ID is the image
// config digest rather than a repo digest, so imageUsers is keyed by img.ID.
if !inUse {
_, inUse = p.imageUsers[img.ID]
}

cyclosbom, err := sbomutil.UncompressSBOM(img.SBOM)
if err != nil {
Expand Down Expand Up @@ -439,6 +414,10 @@ func (p *processor) processImageSBOM(img *workloadmeta.ContainerImageMetadata) {
continue
}

if !inUse {
_, inUse = p.imageUsers[img.ID]
Comment on lines +417 to +418
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Preserve img.ID fallback when computing in-use state

The img.ID lookup now runs only inside the len(repoDigests)==0 branch, so images that do have repo digests but whose containers are keyed in imageUsers by raw image ID (common with containerd-style ctr.Image.ID) are misclassified as not in use. This emits false InUse=false for actively running containers.

Useful? React with 👍 / 👎.

}

log.Infof("The image %s has no repo digest for repo %s", img.Name, repo)
}

Expand Down
218 changes: 0 additions & 218 deletions pkg/collector/corechecks/sbom/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -750,224 +750,6 @@ func TestProcessEvents(t *testing.T) {
}
}

// TestInUseFlagAccuracy covers three scenarios that previously caused incorrect inUse values:
// 1. Event ordering: container removal and image SBOM update arrive in the same bundle.
// The SBOM should be emitted with inUse=false because the container was removed.
// 2. Container stopped (not yet removed): a container transitions to Running=false and
// then a new SBOM update arrives. The SBOM should reflect inUse=false.
// 3. Containerd-style image ID: ctr.Image.ID is the raw image config digest rather than
// a repo digest. The image should still be reported as inUse=true when a container runs it.
func TestInUseFlagAccuracy(t *testing.T) {
const (
imageID = "sha256:9634b84c45c6ad220c3d0d2305aaa5523e47d6d43649c9bbeda46ff010b4aacd"
repoDigest = "datadog/agent@sha256:052f1fdf4f9a7117d36a1838ab60782829947683007c34b69d4991576375c409"
containerID = "fb9843d6f3e4d9506b08f5ddada74262b7ebf1cf60edb49c71d6c856fd43b75a"
)
sbomTime := time.Now()

imageEntity := &workloadmeta.ContainerImageMetadata{
EntityID: workloadmeta.EntityID{
Kind: workloadmeta.KindContainerImageMetadata,
ID: imageID,
},
RepoTags: []string{"datadog/agent:7-rc"},
RepoDigests: []string{repoDigest},
SBOM: mustCompressSBOM(t, &workloadmeta.SBOM{
CycloneDXBOM: &cyclonedx_v1_4.Bom{
SpecVersion: cyclonedx.SpecVersion1_4.String(),
Version: pointer.Ptr(int32(1)),
},
GenerationTime: sbomTime,
GenerationDuration: time.Second,
Status: workloadmeta.Success,
}),
}

runningContainer := &workloadmeta.Container{
EntityID: workloadmeta.EntityID{Kind: workloadmeta.KindContainer, ID: containerID},
Image: workloadmeta.ContainerImage{ID: repoDigest},
State: workloadmeta.ContainerState{Running: true},
}
stoppedContainer := &workloadmeta.Container{
EntityID: workloadmeta.EntityID{Kind: workloadmeta.KindContainer, ID: containerID},
Image: workloadmeta.ContainerImage{ID: repoDigest},
State: workloadmeta.ContainerState{Running: false},
}

makeExpectedSBOM := func(inUse bool) *model.SBOMEntity {
return &model.SBOMEntity{
Type: model.SBOMSourceType_CONTAINER_IMAGE_LAYERS,
Id: "datadog/agent@" + imageID,
DdTags: []string{"image_id:datadog/agent@" + imageID, "image_name:datadog/agent", "short_image:agent", "image_tag:7-rc"},
RepoTags: []string{"7-rc"},
RepoDigests: []string{repoDigest},
InUse: inUse,
GeneratedAt: timestamppb.New(sbomTime),
GenerationDuration: durationpb.New(time.Second),
Sbom: &model.SBOMEntity_Cyclonedx{Cyclonedx: &cyclonedx_v1_4.Bom{
SpecVersion: "1.4",
Version: pointer.Ptr(int32(1)),
}},
Status: model.SBOMStatus_SUCCESS,
}
}

cacheDir := t.TempDir()
cfg := configcomp.NewMockWithOverrides(t, map[string]interface{}{
"sbom.cache_directory": cacheDir,
"sbom.container_image.enabled": true,
"sbom.container_image.allow_missing_repodigest": true,
})
wmeta := fxutil.Test[option.Option[workloadmeta.Component]](t, fx.Options(
core.MockBundle(),
workloadmetafxmock.MockModule(workloadmeta.NewParams()),
))
_, err := sbomscanner.CreateGlobalScanner(cfg, wmeta)
assert.Nil(t, err)

makeBundle := func(events ...workloadmeta.Event) workloadmeta.EventBundle {
return workloadmeta.EventBundle{Events: events, Ch: make(chan struct{})}
}

newTestSetup := func(t *testing.T) (*processor, *mocksender.MockSender, *atomic.Int32, workloadmetamock.Mock) {
t.Helper()
store := fxutil.Test[workloadmetamock.Mock](t, fx.Options(
fx.Provide(func() log.Component { return logmock.New(t) }),
fx.Provide(func() configcomp.Component { return configcomp.NewMock(t) }),
fx.Supply(context.Background()),
workloadmetafxmock.MockModule(workloadmeta.NewParams()),
))
counter := atomic.NewInt32(0)
sender := mocksender.NewMockSender("")
sender.On("EventPlatformEvent", mock.Anything, mock.Anything).Return().Run(func(_ mock.Arguments) {
counter.Inc()
})
fakeTagger := taggerfxmock.SetupFakeTagger(t)
mockFilterStore := workloadfilterfxmock.SetupMockFilter(t)
// Queue size 1 so each entity becomes its own event, matching the assertion style.
p, err := newProcessor(store, mockFilterStore, sender, fakeTagger, cfg, 1, 50*time.Millisecond, time.Second)
assert.Nil(t, err)
return p, sender, counter, store
}

// Test 1: container removal and image SBOM update in the same bundle
t.Run("event ordering: container removal before image SBOM emission", func(t *testing.T) {
p, sender, counter, store := newTestSetup(t)

// Establish initial state: running container.
store.Set(imageEntity)
store.Set(runningContainer)
p.processContainerImagesEvents(makeBundle(
workloadmeta.Event{Type: workloadmeta.EventTypeSet, Entity: imageEntity},
workloadmeta.Event{Type: workloadmeta.EventTypeSet, Entity: runningContainer},
))
// Wait for the two SBOMs from the setup phase (inUse=false, then inUse=true).
assert.Eventually(t, func() bool { return counter.Load() == 2 }, time.Second, 5*time.Millisecond)
counter.Store(0)

// The container is removed AND a fresh SBOM arrives in the same bundle.
// Without the event-ordering fix the SBOM would be emitted with inUse=true
// because the image Set was processed before the container Unset.
store.Unset(runningContainer)
store.Set(imageEntity)
p.processContainerImagesEvents(makeBundle(
workloadmeta.Event{Type: workloadmeta.EventTypeSet, Entity: imageEntity},
workloadmeta.Event{Type: workloadmeta.EventTypeUnset, Entity: runningContainer},
))
p.stop()

assert.Eventually(t, func() bool { return counter.Load() == 1 }, time.Second, 5*time.Millisecond)

hname, _ := hostname.Get(context.TODO())
envVarEnv := cfg.GetString("env")
encoded, err := proto.Marshal(&model.SBOMPayload{
Version: 1,
Host: hname,
Source: &sourceAgent,
Entities: []*model.SBOMEntity{makeExpectedSBOM(false)},
DdEnv: &envVarEnv,
})
assert.Nil(t, err)
sender.AssertEventPlatformEvent(t, encoded, eventplatform.EventTypeContainerSBOM)
})

// Test 2: container transitions stopped then image SBOM updates
t.Run("stopped container does not keep inUse=true", func(t *testing.T) {
p, sender, counter, store := newTestSetup(t)

// Step 1: image + running container → two SBOMs (inUse=false then inUse=true).
store.Set(imageEntity)
store.Set(runningContainer)
p.processContainerImagesEvents(makeBundle(
workloadmeta.Event{Type: workloadmeta.EventTypeSet, Entity: imageEntity},
workloadmeta.Event{Type: workloadmeta.EventTypeSet, Entity: runningContainer},
))
assert.Eventually(t, func() bool { return counter.Load() == 2 }, time.Second, 5*time.Millisecond)
counter.Store(0)

// Step 2: container stops (Running=false) and a new image SBOM arrives.
// Without the registerContainer fix the container remains in imageUsers and
// the SBOM is incorrectly emitted with inUse=true.
store.Set(stoppedContainer)
store.Set(imageEntity)
p.processContainerImagesEvents(makeBundle(
workloadmeta.Event{Type: workloadmeta.EventTypeSet, Entity: stoppedContainer},
workloadmeta.Event{Type: workloadmeta.EventTypeSet, Entity: imageEntity},
))
p.stop()

assert.Eventually(t, func() bool { return counter.Load() == 1 }, time.Second, 5*time.Millisecond)

hname, _ := hostname.Get(context.TODO())
envVarEnv := cfg.GetString("env")
encoded, err := proto.Marshal(&model.SBOMPayload{
Version: 1,
Host: hname,
Source: &sourceAgent,
Entities: []*model.SBOMEntity{makeExpectedSBOM(false)},
DdEnv: &envVarEnv,
})
assert.Nil(t, err)
sender.AssertEventPlatformEvent(t, encoded, eventplatform.EventTypeContainerSBOM)
})

// Test 3: containerd-style image ID (raw sha256, not repo digest)
t.Run("containerd image ID key fallback", func(t *testing.T) {
p, sender, counter, store := newTestSetup(t)

// Container uses the raw image config digest as its Image.ID (containerd style).
containerdContainer := &workloadmeta.Container{
EntityID: workloadmeta.EntityID{Kind: workloadmeta.KindContainer, ID: containerID},
Image: workloadmeta.ContainerImage{ID: imageID}, // raw sha256, not repo digest
State: workloadmeta.ContainerState{Running: true},
}

store.Set(imageEntity)
store.Set(containerdContainer)
p.processContainerImagesEvents(makeBundle(
workloadmeta.Event{Type: workloadmeta.EventTypeSet, Entity: imageEntity},
workloadmeta.Event{Type: workloadmeta.EventTypeSet, Entity: containerdContainer},
))
p.stop()

// Expect two SBOMs: first from image Set (inUse=false, no containers yet),
// second from registerContainer (inUse=true via img.ID fallback).
assert.Eventually(t, func() bool { return counter.Load() == 2 }, time.Second, 5*time.Millisecond)

hname, _ := hostname.Get(context.TODO())
envVarEnv := cfg.GetString("env")
encoded, err := proto.Marshal(&model.SBOMPayload{
Version: 1,
Host: hname,
Source: &sourceAgent,
Entities: []*model.SBOMEntity{makeExpectedSBOM(true)},
DdEnv: &envVarEnv,
})
assert.Nil(t, err)
sender.AssertEventPlatformEvent(t, encoded, eventplatform.EventTypeContainerSBOM)
})
}

func mustCompressSBOM(t *testing.T, sbom *workloadmeta.SBOM) *workloadmeta.CompressedSBOM {
t.Helper()

Expand Down
Loading