Skip to content

Commit 8252381

Browse files
committed
introduce a DUPLICATE state, remove duplicate flag
1 parent 45fe359 commit 8252381

File tree

9 files changed

+51
-172
lines changed

9 files changed

+51
-172
lines changed

diode-proto/diode/v1/reconciler.proto

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ enum State {
1616
NO_CHANGES = 5;
1717
IGNORED = 6;
1818
ERRORED = 7;
19+
DUPLICATE = 8;
1920
}
2021

2122
// Ingestion metrics
@@ -51,7 +52,6 @@ message IngestionLog {
5152
ChangeSet change_set = 12;
5253
string object_type = 13;
5354
int64 source_ts = 14;
54-
bool is_duplicate = 15;
5555
}
5656

5757
// The request to retrieve ingestion logs

diode-server/gen/diode/v1/reconcilerpb/reconciler.pb.go

Lines changed: 8 additions & 13 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

diode-server/gen/diode/v1/reconcilerpb/reconciler.pb.validate.go

Lines changed: 0 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

diode-server/reconciler/ingestion_processor.go

Lines changed: 1 addition & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,6 @@ type IngestionLogToProcess struct {
7979
type IngestionProcessorOps interface {
8080
CreateIngestionLog(ctx context.Context, ingestionLog *reconcilerpb.IngestionLog, sourceMetadata []byte) (*ops.CreateIngestionLogResult, error)
8181
GenerateChangeSet(ctx context.Context, ingestionLogID int32, ingestionLog *reconcilerpb.IngestionLog, branchID string) (*int32, *changeset.ChangeSet, error)
82-
MakePrimary(ctx context.Context, ingestionLogID int32) error
8382
ApplyChangeSet(ctx context.Context, ingestionLogID int32, ingestionLog *reconcilerpb.IngestionLog, changeSetID int32, changeSet *changeset.ChangeSet) error
8483
}
8584

@@ -305,30 +304,7 @@ func (p *IngestionProcessor) GenerateChangeSet(ctx context.Context, generateChan
305304
p.metrics.RecordChangeSetCreate(ctx, true, int64(len(changeSet.Changes)))
306305
}
307306

308-
if msg.ingestionLog.IsDuplicate {
309-
// Reprocessing a duplicate record.
310-
// If there are changes or errors, make it a primary (non-duplicate) record.
311-
makePrimary := false
312-
switch msg.ingestionLog.State {
313-
case reconcilerpb.State_OPEN:
314-
makePrimary = true
315-
case reconcilerpb.State_FAILED:
316-
makePrimary = true
317-
case reconcilerpb.State_ERRORED:
318-
makePrimary = true
319-
default:
320-
}
321-
if makePrimary {
322-
err := p.ops.MakePrimary(ctx, msg.ingestionLogID)
323-
if err != nil {
324-
p.logger.Error("error making ingestion log primary", "error", err)
325-
return
326-
}
327-
msg.ingestionLog.IsDuplicate = false
328-
}
329-
}
330-
331-
if changeSet != nil && len(changeSet.Changes) > 0 {
307+
if changeSet != nil && len(changeSet.Changes) > 0 && msg.ingestionLog.State != reconcilerpb.State_DUPLICATE {
332308
if applyChangeSetChan != nil {
333309
applyChangeSetChan <- IngestionLogToProcess{
334310
ingestionLogID: msg.ingestionLogID,

diode-server/reconciler/ingestion_processor_test.go

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -343,9 +343,9 @@ func TestIngestionProcessor_DuplicateHandling(t *testing.T) {
343343
name string
344344
existingLogState reconcilerpb.State
345345
stateAfterChangeset reconcilerpb.State
346+
makePrimary bool
346347
expectSkipProcessing bool
347348
expectReuseExistingID bool
348-
expectMakePrimary bool
349349
changeSetHasChanges bool
350350
}{
351351
{
@@ -383,7 +383,7 @@ func TestIngestionProcessor_DuplicateHandling(t *testing.T) {
383383
stateAfterChangeset: reconcilerpb.State_OPEN,
384384
expectSkipProcessing: false,
385385
expectReuseExistingID: false,
386-
expectMakePrimary: true,
386+
makePrimary: true,
387387
changeSetHasChanges: true,
388388
},
389389
{
@@ -392,7 +392,7 @@ func TestIngestionProcessor_DuplicateHandling(t *testing.T) {
392392
stateAfterChangeset: reconcilerpb.State_NO_CHANGES,
393393
expectSkipProcessing: false,
394394
expectReuseExistingID: false,
395-
expectMakePrimary: false,
395+
makePrimary: false,
396396
changeSetHasChanges: false,
397397
},
398398
}
@@ -445,11 +445,10 @@ func TestIngestionProcessor_DuplicateHandling(t *testing.T) {
445445
Created: ops.IngestionLogRef{
446446
ID: newLogID,
447447
IngestionLog: &reconcilerpb.IngestionLog{
448-
Id: "new-log-id",
449-
ObjectType: "dcim.site",
450-
State: reconcilerpb.State_QUEUED,
451-
Entity: testEntity,
452-
IsDuplicate: true,
448+
Id: "new-log-id",
449+
ObjectType: "dcim.site",
450+
State: reconcilerpb.State_DUPLICATE,
451+
Entity: testEntity,
453452
},
454453
},
455454
DuplicateOf: &ops.IngestionLogRef{
@@ -484,18 +483,22 @@ func TestIngestionProcessor_DuplicateHandling(t *testing.T) {
484483

485484
mockOps.On("GenerateChangeSet", mock.Anything, changeSetLogID, ingestionLogForChangeset, "").Run(func(_ mock.Arguments) {
486485
ingestionLogForChangeset.State = tt.stateAfterChangeset
486+
if tt.makePrimary {
487+
ingestionLogForChangeset.State = reconcilerpb.State_OPEN
488+
}
487489
}).Return(int32Ptr(1), mockChangeSet, nil)
488490

489-
if tt.expectMakePrimary {
490-
mockOps.On("MakePrimary", mock.Anything, newLogID).Return(nil)
491-
}
492-
493491
mockMetrics.On("RecordChangeSetCreate", mock.Anything, mock.Anything, mock.Anything).Return()
494492

495493
if tt.stateAfterChangeset == reconcilerpb.State_OPEN {
496494
mockOps.On("ApplyChangeSet", mock.Anything, changeSetLogID, ingestionLogForChangeset, int32(1), mockChangeSet).Return(nil)
497495
mockMetrics.On("RecordChangeSetApply", mock.Anything, mock.Anything, mock.Anything).Return()
498496
}
497+
498+
if tt.makePrimary {
499+
mockOps.On("ApplyChangeSet", mock.Anything, newLogID, ingestionLogForChangeset, int32(1), mockChangeSet).Return(nil)
500+
mockMetrics.On("RecordChangeSetApply", mock.Anything, mock.Anything, mock.Anything).Return()
501+
}
499502
}
500503

501504
mockMetrics.On("RecordHandleMessage", mock.Anything, mock.Anything).Return()

diode-server/reconciler/mocks/ingestionprocessorops.go

Lines changed: 0 additions & 47 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

diode-server/reconciler/ops.go

Lines changed: 24 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -68,11 +68,10 @@ func (o *Ops) CreateIngestionLog(ctx context.Context, ingestionLog *reconcilerpb
6868
if err := o.repository.MarkIngestionLogAsDuplicate(ctx, *id, *existingID); err != nil {
6969
return nil, fmt.Errorf("failed to mark record as duplicate: %w", err)
7070
}
71-
// for clarity, duplicate logs are marked as IGNORED
72-
if err := o.repository.UpdateIngestionLogStateWithError(ctx, *id, reconcilerpb.State_IGNORED, nil); err != nil {
71+
if err := o.repository.UpdateIngestionLogStateWithError(ctx, *id, reconcilerpb.State_DUPLICATE, nil); err != nil {
7372
return nil, fmt.Errorf("failed to update ingestion log state: %w", err)
7473
}
75-
result.Created.IngestionLog.IsDuplicate = true
74+
result.Created.IngestionLog.State = reconcilerpb.State_DUPLICATE
7675
result.DuplicateOf = &ops.IngestionLogRef{
7776
ID: *existingID,
7877
IngestionLog: existingLog,
@@ -91,6 +90,8 @@ func (o *Ops) GenerateChangeSet(ctx context.Context, ingestionLogID int32, inges
9190
State: int(ingestionLog.GetState()),
9291
}
9392

93+
isDuplicate := ingestionLog.State == reconcilerpb.State_DUPLICATE
94+
9495
changeSet, err := differ.Diff(ctx, ingestEntity, branchID, o.nbClient)
9596
if err != nil {
9697
tags := map[string]string{
@@ -108,6 +109,11 @@ func (o *Ops) GenerateChangeSet(ctx context.Context, ingestionLogID int32, inges
108109
if err2 := o.repository.UpdateIngestionLogStateWithError(ctx, ingestionLogID, reconcilerpb.State_FAILED, changeSetErr); err2 != nil {
109110
err = errors.Join(err, err2)
110111
}
112+
if isDuplicate {
113+
if err2 := o.repository.MarkIngestionLogAsPrimary(ctx, ingestionLogID); err != nil {
114+
err = errors.Join(err, err2)
115+
}
116+
}
111117

112118
cs := differ.FailedDiffChangeSet(ingestEntity, branchID)
113119
id, err1 := o.repository.CreateChangeSet(ctx, *cs, ingestionLogID)
@@ -124,27 +130,27 @@ func (o *Ops) GenerateChangeSet(ctx context.Context, ingestionLogID int32, inges
124130
return nil, nil, err
125131
}
126132

127-
state := reconcilerpb.State_OPEN
128-
if len(changeSet.Changes) == 0 {
129-
state = reconcilerpb.State_NO_CHANGES
130-
}
131-
ingestionLog.State = state
132-
133-
if err := o.repository.UpdateIngestionLogStateWithError(ctx, ingestionLogID, state, nil); err != nil {
134-
o.logger.Warn("failed to update ingestion log state (error ignored)", "ingestionLogID", ingestionLogID, "error", err)
135-
// TODO(ltucker): This should be in a transaction. Can leave an inconsistent state marked on the ingestion log.
136-
// return nil, err
133+
if len(changeSet.Changes) > 0 {
134+
ingestionLog.State = reconcilerpb.State_OPEN
135+
if err := o.repository.UpdateIngestionLogStateWithError(ctx, ingestionLogID, reconcilerpb.State_OPEN, nil); err != nil {
136+
o.logger.Error("failed to update ingestion log state (error ignored)", "ingestionLogID", ingestionLogID, "error", err)
137+
}
138+
if isDuplicate {
139+
if err := o.repository.MarkIngestionLogAsPrimary(ctx, ingestionLogID); err != nil {
140+
o.logger.Error("failed to mark ingestion log as primary (error ignored)", "ingestionLogID", ingestionLogID, "error", err)
141+
}
142+
}
143+
} else if !isDuplicate {
144+
ingestionLog.State = reconcilerpb.State_NO_CHANGES
145+
if err := o.repository.UpdateIngestionLogStateWithError(ctx, ingestionLogID, reconcilerpb.State_NO_CHANGES, nil); err != nil {
146+
o.logger.Error("failed to update ingestion log state (error ignored)", "ingestionLogID", ingestionLogID, "error", err)
147+
}
137148
}
138149

139150
o.logger.Debug("change set generated", "id", changeSetID, "externalID", changeSet.ID, "ingestionLogID", ingestionLogID)
140151
return changeSetID, changeSet, nil
141152
}
142153

143-
// MakePrimary makes an ingestion log a primary record
144-
func (o *Ops) MakePrimary(ctx context.Context, ingestionLogID int32) error {
145-
return o.repository.MarkIngestionLogAsPrimary(ctx, ingestionLogID)
146-
}
147-
148154
// ApplyChangeSet applies change set to NetBox and updates related states
149155
func (o *Ops) ApplyChangeSet(ctx context.Context, ingestionLogID int32, ingestionLog *reconcilerpb.IngestionLog, changeSetID int32, changeSet *changeset.ChangeSet) error {
150156
if err := applier.ApplyChangeSet(ctx, o.logger, *changeSet, o.nbClient); err != nil {

0 commit comments

Comments
 (0)