Skip to content

Commit cd99da5

Browse files
committed
introduce a DUPLICATE state
1 parent 45fe359 commit cd99da5

File tree

8 files changed

+53
-150
lines changed

8 files changed

+53
-150
lines changed

diode-proto/diode/v1/reconciler.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ enum State {
1616
NO_CHANGES = 5;
1717
IGNORED = 6;
1818
ERRORED = 7;
19+
DUPLICATE = 8;
1920
}
2021

2122
// Ingestion metrics

diode-server/gen/diode/v1/reconcilerpb/reconciler.pb.go

Lines changed: 6 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

diode-server/reconciler/ingestion_processor.go

Lines changed: 1 addition & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,6 @@ type IngestionLogToProcess struct {
7979
type IngestionProcessorOps interface {
8080
CreateIngestionLog(ctx context.Context, ingestionLog *reconcilerpb.IngestionLog, sourceMetadata []byte) (*ops.CreateIngestionLogResult, error)
8181
GenerateChangeSet(ctx context.Context, ingestionLogID int32, ingestionLog *reconcilerpb.IngestionLog, branchID string) (*int32, *changeset.ChangeSet, error)
82-
MakePrimary(ctx context.Context, ingestionLogID int32) error
8382
ApplyChangeSet(ctx context.Context, ingestionLogID int32, ingestionLog *reconcilerpb.IngestionLog, changeSetID int32, changeSet *changeset.ChangeSet) error
8483
}
8584

@@ -305,30 +304,7 @@ func (p *IngestionProcessor) GenerateChangeSet(ctx context.Context, generateChan
305304
p.metrics.RecordChangeSetCreate(ctx, true, int64(len(changeSet.Changes)))
306305
}
307306

308-
if msg.ingestionLog.IsDuplicate {
309-
// Reprocessing a duplicate record.
310-
// If there are changes or errors, make it a primary (non-duplicate) record.
311-
makePrimary := false
312-
switch msg.ingestionLog.State {
313-
case reconcilerpb.State_OPEN:
314-
makePrimary = true
315-
case reconcilerpb.State_FAILED:
316-
makePrimary = true
317-
case reconcilerpb.State_ERRORED:
318-
makePrimary = true
319-
default:
320-
}
321-
if makePrimary {
322-
err := p.ops.MakePrimary(ctx, msg.ingestionLogID)
323-
if err != nil {
324-
p.logger.Error("error making ingestion log primary", "error", err)
325-
return
326-
}
327-
msg.ingestionLog.IsDuplicate = false
328-
}
329-
}
330-
331-
if changeSet != nil && len(changeSet.Changes) > 0 {
307+
if changeSet != nil && len(changeSet.Changes) > 0 && !msg.ingestionLog.IsDuplicate {
332308
if applyChangeSetChan != nil {
333309
applyChangeSetChan <- IngestionLogToProcess{
334310
ingestionLogID: msg.ingestionLogID,

diode-server/reconciler/ingestion_processor_test.go

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -343,9 +343,9 @@ func TestIngestionProcessor_DuplicateHandling(t *testing.T) {
343343
name string
344344
existingLogState reconcilerpb.State
345345
stateAfterChangeset reconcilerpb.State
346+
makePrimary bool
346347
expectSkipProcessing bool
347348
expectReuseExistingID bool
348-
expectMakePrimary bool
349349
changeSetHasChanges bool
350350
}{
351351
{
@@ -383,7 +383,7 @@ func TestIngestionProcessor_DuplicateHandling(t *testing.T) {
383383
stateAfterChangeset: reconcilerpb.State_OPEN,
384384
expectSkipProcessing: false,
385385
expectReuseExistingID: false,
386-
expectMakePrimary: true,
386+
makePrimary: true,
387387
changeSetHasChanges: true,
388388
},
389389
{
@@ -392,7 +392,7 @@ func TestIngestionProcessor_DuplicateHandling(t *testing.T) {
392392
stateAfterChangeset: reconcilerpb.State_NO_CHANGES,
393393
expectSkipProcessing: false,
394394
expectReuseExistingID: false,
395-
expectMakePrimary: false,
395+
makePrimary: false,
396396
changeSetHasChanges: false,
397397
},
398398
}
@@ -484,18 +484,23 @@ func TestIngestionProcessor_DuplicateHandling(t *testing.T) {
484484

485485
mockOps.On("GenerateChangeSet", mock.Anything, changeSetLogID, ingestionLogForChangeset, "").Run(func(_ mock.Arguments) {
486486
ingestionLogForChangeset.State = tt.stateAfterChangeset
487+
if tt.makePrimary {
488+
ingestionLogForChangeset.IsDuplicate = false
489+
ingestionLogForChangeset.State = reconcilerpb.State_OPEN
490+
}
487491
}).Return(int32Ptr(1), mockChangeSet, nil)
488492

489-
if tt.expectMakePrimary {
490-
mockOps.On("MakePrimary", mock.Anything, newLogID).Return(nil)
491-
}
492-
493493
mockMetrics.On("RecordChangeSetCreate", mock.Anything, mock.Anything, mock.Anything).Return()
494494

495495
if tt.stateAfterChangeset == reconcilerpb.State_OPEN {
496496
mockOps.On("ApplyChangeSet", mock.Anything, changeSetLogID, ingestionLogForChangeset, int32(1), mockChangeSet).Return(nil)
497497
mockMetrics.On("RecordChangeSetApply", mock.Anything, mock.Anything, mock.Anything).Return()
498498
}
499+
500+
if tt.makePrimary {
501+
mockOps.On("ApplyChangeSet", mock.Anything, newLogID, ingestionLogForChangeset, int32(1), mockChangeSet).Return(nil)
502+
mockMetrics.On("RecordChangeSetApply", mock.Anything, mock.Anything, mock.Anything).Return()
503+
}
499504
}
500505

501506
mockMetrics.On("RecordHandleMessage", mock.Anything, mock.Anything).Return()

diode-server/reconciler/mocks/ingestionprocessorops.go

Lines changed: 0 additions & 47 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

diode-server/reconciler/ops.go

Lines changed: 31 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -68,10 +68,10 @@ func (o *Ops) CreateIngestionLog(ctx context.Context, ingestionLog *reconcilerpb
6868
if err := o.repository.MarkIngestionLogAsDuplicate(ctx, *id, *existingID); err != nil {
6969
return nil, fmt.Errorf("failed to mark record as duplicate: %w", err)
7070
}
71-
// for clarity, duplicate logs are marked as IGNORED
72-
if err := o.repository.UpdateIngestionLogStateWithError(ctx, *id, reconcilerpb.State_IGNORED, nil); err != nil {
71+
if err := o.repository.UpdateIngestionLogStateWithError(ctx, *id, reconcilerpb.State_DUPLICATE, nil); err != nil {
7372
return nil, fmt.Errorf("failed to update ingestion log state: %w", err)
7473
}
74+
result.Created.IngestionLog.State = reconcilerpb.State_DUPLICATE
7575
result.Created.IngestionLog.IsDuplicate = true
7676
result.DuplicateOf = &ops.IngestionLogRef{
7777
ID: *existingID,
@@ -108,6 +108,12 @@ func (o *Ops) GenerateChangeSet(ctx context.Context, ingestionLogID int32, inges
108108
if err2 := o.repository.UpdateIngestionLogStateWithError(ctx, ingestionLogID, reconcilerpb.State_FAILED, changeSetErr); err2 != nil {
109109
err = errors.Join(err, err2)
110110
}
111+
if ingestionLog.IsDuplicate {
112+
ingestionLog.IsDuplicate = false
113+
if err := o.repository.MarkIngestionLogAsPrimary(ctx, ingestionLogID); err != nil {
114+
err = errors.Join(err, err)
115+
}
116+
}
111117

112118
cs := differ.FailedDiffChangeSet(ingestEntity, branchID)
113119
id, err1 := o.repository.CreateChangeSet(ctx, *cs, ingestionLogID)
@@ -124,27 +130,36 @@ func (o *Ops) GenerateChangeSet(ctx context.Context, ingestionLogID int32, inges
124130
return nil, nil, err
125131
}
126132

127-
state := reconcilerpb.State_OPEN
128-
if len(changeSet.Changes) == 0 {
129-
state = reconcilerpb.State_NO_CHANGES
130-
}
131-
ingestionLog.State = state
133+
if !ingestionLog.IsDuplicate {
134+
state := reconcilerpb.State_OPEN
135+
if len(changeSet.Changes) == 0 {
136+
state = reconcilerpb.State_NO_CHANGES
137+
}
138+
ingestionLog.State = state
132139

133-
if err := o.repository.UpdateIngestionLogStateWithError(ctx, ingestionLogID, state, nil); err != nil {
134-
o.logger.Warn("failed to update ingestion log state (error ignored)", "ingestionLogID", ingestionLogID, "error", err)
135-
// TODO(ltucker): This should be in a transaction. Can leave an inconsistent state marked on the ingestion log.
136-
// return nil, err
140+
if err := o.repository.UpdateIngestionLogStateWithError(ctx, ingestionLogID, state, nil); err != nil {
141+
o.logger.Warn("failed to update ingestion log state (error ignored)", "ingestionLogID", ingestionLogID, "error", err)
142+
// TODO(ltucker): This should be in a transaction. Can leave an inconsistent state marked on the ingestion log.
143+
// return nil, err
144+
}
145+
} else if len(changeSet.Changes) > 0 {
146+
// we reprocessed a duplicate record and found new changes ...
147+
// we need to mark it as a primary record
148+
if err := o.repository.MarkIngestionLogAsPrimary(ctx, ingestionLogID); err != nil {
149+
return nil, nil, err
150+
}
151+
ingestionLog.IsDuplicate = false
152+
ingestionLog.State = reconcilerpb.State_OPEN
153+
if err := o.repository.UpdateIngestionLogStateWithError(ctx, ingestionLogID, reconcilerpb.State_OPEN, nil); err != nil {
154+
o.logger.Warn("failed to update ingestion log state (error ignored)", "ingestionLogID", ingestionLogID, "error", err)
155+
return nil, nil, err
156+
}
137157
}
138158

139159
o.logger.Debug("change set generated", "id", changeSetID, "externalID", changeSet.ID, "ingestionLogID", ingestionLogID)
140160
return changeSetID, changeSet, nil
141161
}
142162

143-
// MakePrimary makes an ingestion log a primary record
144-
func (o *Ops) MakePrimary(ctx context.Context, ingestionLogID int32) error {
145-
return o.repository.MarkIngestionLogAsPrimary(ctx, ingestionLogID)
146-
}
147-
148163
// ApplyChangeSet applies change set to NetBox and updates related states
149164
func (o *Ops) ApplyChangeSet(ctx context.Context, ingestionLogID int32, ingestionLog *reconcilerpb.IngestionLog, changeSetID int32, changeSet *changeset.ChangeSet) error {
150165
if err := applier.ApplyChangeSet(ctx, o.logger, *changeSet, o.nbClient); err != nil {

diode-server/reconciler/ops_test.go

Lines changed: 1 addition & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -350,7 +350,7 @@ func TestOpsCreateIngestionLog(t *testing.T) {
350350
mockRepository.EXPECT().MarkIngestionLogAsDuplicate(mock.Anything, *tt.mockCreateIngestionLog.id, *tt.mockFindPriorIngestionLogID).
351351
Return(tt.mockMarkAsDuplicateError)
352352
if tt.mockMarkAsDuplicateError == nil {
353-
mockRepository.EXPECT().UpdateIngestionLogStateWithError(mock.Anything, *tt.mockCreateIngestionLog.id, pb.State_IGNORED, nil).
353+
mockRepository.EXPECT().UpdateIngestionLogStateWithError(mock.Anything, *tt.mockCreateIngestionLog.id, pb.State_DUPLICATE, nil).
354354
Return(nil)
355355
}
356356
}
@@ -381,55 +381,3 @@ func TestOpsCreateIngestionLog(t *testing.T) {
381381
})
382382
}
383383
}
384-
385-
func TestOpsMakePrimary(t *testing.T) {
386-
ctx := context.Background()
387-
logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug, AddSource: false}))
388-
389-
tests := []struct {
390-
name string
391-
392-
ingestionLogID int32
393-
394-
mockPromoteToPrimaryError error
395-
396-
expectedError string
397-
}{
398-
{
399-
name: "successful promotion",
400-
ingestionLogID: 1234,
401-
402-
mockPromoteToPrimaryError: nil,
403-
404-
expectedError: "",
405-
},
406-
{
407-
name: "promotion fails",
408-
ingestionLogID: 1234,
409-
410-
mockPromoteToPrimaryError: fmt.Errorf("database error during promotion"),
411-
412-
expectedError: "database error during promotion",
413-
},
414-
}
415-
416-
for _, tt := range tests {
417-
t.Run(tt.name, func(t *testing.T) {
418-
mockRepository := mocks.NewRepository(t)
419-
mockNetBoxClient := pluginmocks.NewNetBoxAPI(t)
420-
opsInstance := reconciler.NewOps(mockRepository, mockNetBoxClient, logger)
421-
422-
mockRepository.EXPECT().MarkIngestionLogAsPrimary(ctx, tt.ingestionLogID).
423-
Return(tt.mockPromoteToPrimaryError)
424-
425-
err := opsInstance.MakePrimary(ctx, tt.ingestionLogID)
426-
427-
if tt.expectedError != "" {
428-
require.Error(t, err)
429-
require.Contains(t, err.Error(), tt.expectedError)
430-
} else {
431-
require.NoError(t, err)
432-
}
433-
})
434-
}
435-
}

docs/diode-proto.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4511,6 +4511,7 @@ The response from the retrieve ingestion logs request
45114511
| NO_CHANGES | 5 | |
45124512
| IGNORED | 6 | |
45134513
| ERRORED | 7 | |
4514+
| DUPLICATE | 8 | |
45144515

45154516

45164517

0 commit comments

Comments
 (0)