diff --git a/diode-server/cmd/reconciler/main.go b/diode-server/cmd/reconciler/main.go index 9a44b390..8e6ae9b8 100644 --- a/diode-server/cmd/reconciler/main.go +++ b/diode-server/cmd/reconciler/main.go @@ -130,7 +130,7 @@ func main() { os.Exit(1) } - ops := reconciler.NewOps(repository, nbClient, s.Logger()) + ops := reconciler.NewOps(repository, nbClient, s.Logger(), nil) ingestionProcessor, err := reconciler.NewIngestionProcessor(ctx, s.Logger(), cfg, redisClient, redisStreamClient, reconciler.DefaultRedisStreamID, reconciler.DefaultRedisConsumerGroup, ops, metricRecorder) if err != nil { diff --git a/diode-server/dbstore/postgres/migrations/20250707184637_entity_deduplication.sql b/diode-server/dbstore/postgres/migrations/20250707184637_entity_deduplication.sql new file mode 100644 index 00000000..d550d74f --- /dev/null +++ b/diode-server/dbstore/postgres/migrations/20250707184637_entity_deduplication.sql @@ -0,0 +1,17 @@ +-- +goose Up + +ALTER TABLE ingestion_logs + ADD COLUMN entity_hash VARCHAR(64), + ADD COLUMN last_seen TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, + ADD COLUMN duplicate_count INTEGER DEFAULT 0 NOT NULL; + +CREATE INDEX idx_ingestion_logs_entity_hash ON ingestion_logs(entity_hash); + +-- +goose Down + +DROP INDEX IF EXISTS idx_ingestion_logs_entity_hash; + +ALTER TABLE ingestion_logs + DROP COLUMN IF EXISTS entity_hash, + DROP COLUMN IF EXISTS last_seen, + DROP COLUMN IF EXISTS duplicate_count; \ No newline at end of file diff --git a/diode-server/dbstore/postgres/queries/change_sets.sql b/diode-server/dbstore/postgres/queries/change_sets.sql index d12d4c1c..c2ef95d7 100644 --- a/diode-server/dbstore/postgres/queries/change_sets.sql +++ b/diode-server/dbstore/postgres/queries/change_sets.sql @@ -11,3 +11,14 @@ INSERT INTO changes (external_id, change_set_id, change_type, object_type, objec sequence_number) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) RETURNING *; + +-- name: TruncateChangeSets :exec +DELETE FROM change_sets cs1 +WHERE cs1.ingestion_log_id = $1 + AND cs1.id NOT IN ( + SELECT cs2.id + FROM change_sets cs2 + WHERE cs2.ingestion_log_id = $1 + ORDER BY cs2.id DESC + LIMIT $2 + ); \ No newline at end of file diff --git a/diode-server/dbstore/postgres/queries/ingestion_logs.sql b/diode-server/dbstore/postgres/queries/ingestion_logs.sql index 70772a32..c58b6746 100644 --- a/diode-server/dbstore/postgres/queries/ingestion_logs.sql +++ b/diode-server/dbstore/postgres/queries/ingestion_logs.sql @@ -1,7 +1,7 @@ -- name: CreateIngestionLog :one INSERT INTO ingestion_logs (external_id, object_type, state, request_id, ingestion_ts, source_ts, producer_app_name, - producer_app_version, sdk_name, sdk_version, entity, source_metadata) -VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) + producer_app_version, sdk_name, sdk_version, entity, source_metadata, entity_hash) +VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13) RETURNING *; -- name: UpdateIngestionLogStateWithError :exec @@ -42,3 +42,29 @@ WHERE (v_deviations.state = sqlc.narg('state') OR sqlc.narg('state') IS NULL) sqlc.narg('ingestion_ts_end') IS NULL) ORDER BY v_deviations.id DESC LIMIT sqlc.arg('limit') OFFSET sqlc.arg('offset'); + +-- name: FindPriorIngestionLogByEntityHash :one +WITH latest_change_sets AS ( + SELECT DISTINCT ON (ingestion_log_id) + ingestion_log_id, + branch_id + FROM change_sets + ORDER BY ingestion_log_id, id DESC +) +SELECT il.* +FROM ingestion_logs il +LEFT JOIN latest_change_sets lcs ON il.id = lcs.ingestion_log_id +WHERE il.entity_hash = sqlc.arg('entity_hash') + AND ( + (sqlc.narg('branch_id')::text IS NOT NULL AND lcs.branch_id = sqlc.narg('branch_id')::text) + OR + (sqlc.narg('branch_id')::text IS NULL AND lcs.branch_id IS NULL) + ) +ORDER BY il.created_at DESC +LIMIT 1; + +-- name: IncrementDuplicateCount :exec +UPDATE ingestion_logs +SET duplicate_count = duplicate_count + 1, + last_seen = CURRENT_TIMESTAMP +WHERE id = $1; diff --git a/diode-server/dbstore/postgres/repository.go b/diode-server/dbstore/postgres/repository.go index ed9f928e..fcb07730 100644 --- a/diode-server/dbstore/postgres/repository.go +++ b/diode-server/dbstore/postgres/repository.go @@ -29,8 +29,8 @@ func NewRepository(pool *pgxpool.Pool) *Repository { } } -// CreateIngestionLog creates a new ingestion log. -func (r *Repository) CreateIngestionLog(ctx context.Context, ingestionLog *reconcilerpb.IngestionLog, sourceMetadata []byte) (*int32, error) { +// CreateIngestionLog creates a new ingestion log with entity hash and deduplication fields. +func (r *Repository) CreateIngestionLog(ctx context.Context, ingestionLog *reconcilerpb.IngestionLog, sourceMetadata []byte, entityHash string) (*int32, error) { marshaler := protojson.MarshalOptions{ UseProtoNames: true, } @@ -38,6 +38,7 @@ func (r *Repository) CreateIngestionLog(ctx context.Context, ingestionLog *recon if err != nil { return nil, fmt.Errorf("failed to marshal entity: %w", err) } + params := postgres.CreateIngestionLogParams{ ExternalID: ingestionLog.Id, ObjectType: pgtype.Text{String: ingestionLog.ObjectType, Valid: true}, @@ -51,6 +52,7 @@ func (r *Repository) CreateIngestionLog(ctx context.Context, ingestionLog *recon SdkVersion: pgtype.Text{String: ingestionLog.SdkVersion, Valid: true}, Entity: entityJSON, SourceMetadata: sourceMetadata, + EntityHash: pgtype.Text{String: entityHash, Valid: true}, } createdIngestionLog, err := r.queries.CreateIngestionLog(ctx, params) @@ -70,6 +72,7 @@ func (r *Repository) RetrieveIngestionLogByExternalID(ctx context.Context, uuid if err != nil { return nil, nil, err } + return &ingestionLog.ID, log, nil } @@ -434,3 +437,38 @@ func deviationToProto(dbDeviation postgres.VDeviation) (*reconcilerpb.Deviation, return deviation, nil } + +// FindPriorIngestionLogByEntityHash finds a prior deviation with the same entity hash, considering branch context. +func (r *Repository) FindPriorIngestionLogByEntityHash(ctx context.Context, entityHash string, currentBranch *string) (*int32, *reconcilerpb.IngestionLog, error) { + params := postgres.FindPriorIngestionLogByEntityHashParams{ + EntityHash: pgtype.Text{String: entityHash, Valid: true}, + } + if currentBranch != nil { + params.BranchID = pgtype.Text{String: *currentBranch, Valid: true} + } + + dbLog, err := r.queries.FindPriorIngestionLogByEntityHash(ctx, params) + if err != nil { + return nil, nil, err + } + + log, err := dbLog.ToProto() + if err != nil { + return nil, nil, fmt.Errorf("failed to convert to proto: %w", err) + } + + return &dbLog.ID, log, nil +} + +// IncrementDuplicateCount increments the duplicate count for an ingestion log +func (r *Repository) IncrementDuplicateCount(ctx context.Context, id int32) error { + return r.queries.IncrementDuplicateCount(ctx, id) +} + +// TruncateChangeSets truncates change sets for an ingestion log to the given limit (keeps latest n) +func (r *Repository) TruncateChangeSets(ctx context.Context, ingestionLogID int32, limit int32) error { + return r.queries.TruncateChangeSets(ctx, postgres.TruncateChangeSetsParams{ + IngestionLogID: ingestionLogID, + Limit: limit, + }) +} diff --git a/diode-server/entityhash/entityhash.go b/diode-server/entityhash/entityhash.go new file mode 100644 index 00000000..18e7c50d --- /dev/null +++ b/diode-server/entityhash/entityhash.go @@ -0,0 +1,63 @@ +package entityhash + +import ( + "crypto/sha256" + "encoding/hex" + "fmt" + + "github.com/gowebpki/jcs" + "google.golang.org/protobuf/encoding/protojson" + + diodepb "github.com/netboxlabs/diode/diode-server/gen/diode/v1/diodepb" +) + +// EntityFingerprinter provides methods for generating consistent hashes of entities +type EntityFingerprinter struct { + marshaler protojson.MarshalOptions +} + +// NewEntityFingerprinter creates a new entity fingerprinter with consistent options +func NewEntityFingerprinter() *EntityFingerprinter { + return &EntityFingerprinter{ + marshaler: protojson.MarshalOptions{ + UseProtoNames: true, // Use snake_case field names for consistency + EmitUnpopulated: false, // Don't include zero/empty values + }, + } +} + +// GenerateEntityHash creates a SHA256 hash for an entity that includes the object type +// and canonicalized entity content, excluding the timestamp from the outer envelope +func (f *EntityFingerprinter) GenerateEntityHash(entity *diodepb.Entity) (string, error) { + if entity == nil { + return "", fmt.Errorf("entity cannot be nil") + } + + // Extract the inner entity content (excluding timestamp) + entityContent := entity.GetEntity() + if entityContent == nil { + return "", fmt.Errorf("entity content cannot be nil") + } + + // Serialize the inner entity content to JSON (excluding timestamp) + entityJSON, err := f.marshaler.Marshal(&diodepb.Entity{Entity: entityContent}) + if err != nil { + return "", fmt.Errorf("failed to marshal entity: %w", err) + } + + return f.GenerateEntityHashFromJSON(entityJSON) +} + +// GenerateEntityHashFromJSON creates a SHA256 hash for an entity. +// This should already exclude the timestamp from the entity envelope. +func (f *EntityFingerprinter) GenerateEntityHashFromJSON(entityJSON []byte) (string, error) { + // Canonicalize the JSON using RFC 8785 JCS + canonicalJSON, err := jcs.Transform(entityJSON) + if err != nil { + return "", fmt.Errorf("failed to canonicalize JSON: %w", err) + } + + // Generate SHA256 hash + hash := sha256.Sum256(canonicalJSON) + return hex.EncodeToString(hash[:]), nil +} diff --git a/diode-server/entityhash/entityhash_test.go b/diode-server/entityhash/entityhash_test.go new file mode 100644 index 00000000..cbf3e522 --- /dev/null +++ b/diode-server/entityhash/entityhash_test.go @@ -0,0 +1,474 @@ +package entityhash_test + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/protobuf/types/known/timestamppb" + + "github.com/netboxlabs/diode/diode-server/entityhash" + diodepb "github.com/netboxlabs/diode/diode-server/gen/diode/v1/diodepb" +) + +func TestEntityFingerprinter_GenerateEntityHash(t *testing.T) { + fingerprinter := entityhash.NewEntityFingerprinter() + + tests := []struct { + name string + entity *diodepb.Entity + expectError bool + errorContains string + expectedLength int + }{ + { + name: "nil entity returns error", + entity: nil, + expectError: true, + errorContains: "entity cannot be nil", + }, + { + name: "entity with nil content returns error", + entity: &diodepb.Entity{ + Timestamp: timestamppb.Now(), + Entity: nil, + }, + expectError: true, + errorContains: "entity content cannot be nil", + }, + { + name: "simple device entity produces valid hash", + entity: &diodepb.Entity{ + Timestamp: timestamppb.Now(), + Entity: &diodepb.Entity_Device{ + Device: &diodepb.Device{ + Name: strPtr("test-device"), + }, + }, + }, + expectError: false, + expectedLength: 64, // SHA256 hex length + }, + { + name: "simple site entity produces valid hash", + entity: &diodepb.Entity{ + Timestamp: timestamppb.Now(), + Entity: &diodepb.Entity_Site{ + Site: &diodepb.Site{ + Name: "test-site", + }, + }, + }, + expectError: false, + expectedLength: 64, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + hash, err := fingerprinter.GenerateEntityHash(tt.entity) + + if tt.expectError { + assert.Error(t, err) + assert.Empty(t, hash) + if tt.errorContains != "" { + assert.Contains(t, err.Error(), tt.errorContains) + } + } else { + require.NoError(t, err) + assert.Len(t, hash, tt.expectedLength) + assert.Regexp(t, "^[a-f0-9]{64}$", hash, "hash should be lowercase hex") + } + }) + } +} + +func TestEntityFingerprinter_BasicEntities(t *testing.T) { + fingerprinter := entityhash.NewEntityFingerprinter() + + timestamp := timestamppb.Now() + tests := []struct { + name string + entity1 *diodepb.Entity + entity2 *diodepb.Entity + shouldMatch bool + }{ + { + name: "identical devices produce same hash", + entity1: &diodepb.Entity{ + Timestamp: timestamp, + Entity: &diodepb.Entity_Device{ + Device: &diodepb.Device{ + Name: strPtr("device1"), + Serial: strPtr("serial123"), + }, + }, + }, + entity2: &diodepb.Entity{ + // Different timestamp is ignoreds + Timestamp: timestamppb.New(timestamp.AsTime().Add(10 * time.Second)), + Entity: &diodepb.Entity_Device{ + Device: &diodepb.Device{ + Name: strPtr("device1"), + Serial: strPtr("serial123"), + }, + }, + }, + shouldMatch: true, + }, + { + name: "different device names produce different hashes", + entity1: &diodepb.Entity{ + Entity: &diodepb.Entity_Device{ + Device: &diodepb.Device{ + Name: strPtr("device1"), + }, + }, + }, + entity2: &diodepb.Entity{ + Entity: &diodepb.Entity_Device{ + Device: &diodepb.Device{ + Name: strPtr("device2"), + }, + }, + }, + shouldMatch: false, + }, + { + name: "different entity types produce different hashes", + entity1: &diodepb.Entity{ + Entity: &diodepb.Entity_Device{ + Device: &diodepb.Device{ + Name: strPtr("test"), + }, + }, + }, + entity2: &diodepb.Entity{ + Entity: &diodepb.Entity_Site{ + Site: &diodepb.Site{ + Name: "test", + }, + }, + }, + shouldMatch: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + hash1, err1 := fingerprinter.GenerateEntityHash(tt.entity1) + hash2, err2 := fingerprinter.GenerateEntityHash(tt.entity2) + + require.NoError(t, err1) + require.NoError(t, err2) + + if tt.shouldMatch { + assert.Equal(t, hash1, hash2, "hashes should be identical") + } else { + assert.NotEqual(t, hash1, hash2, "hashes should be different") + } + }) + } +} + +func TestEntityFingerprinterGenerateEntityHashFromJSON(t *testing.T) { + fingerprinter := entityhash.NewEntityFingerprinter() + + tests := []struct { + name string + entityJSON1 string + entityJSON2 string + shouldMatch bool + }{ + { + name: "same device different field order should produce same hash", + entityJSON1: `{ + "device": { + "name": "device1", + "serial": "123", + "site": {"name": "site1"} + } + }`, + entityJSON2: `{ + "device": { + "site": {"name": "site1"}, + "name": "device1", + "serial": "123" + } + }`, + shouldMatch: true, + }, + { + name: "different device data should produce different hash", + entityJSON1: `{ + "device": { + "name": "device1", + "serial": "123" + } + }`, + entityJSON2: `{ + "device": { + "name": "device1", + "serial": "456" + } + }`, + shouldMatch: false, + }, + { + name: "complex device with nested relationships", + entityJSON1: `{ + "device": { + "name": "Device ABC", + "device_type": { + "manufacturer": {"name": "Cisco"}, + "model": "C2960S" + }, + "role": {"name": "Device Role 1"}, + "tenant": {"name": "Tenant 1"}, + "platform": {"name": "Platform 1"}, + "serial": "1234567890", + "site": {"name": "Site 1"} + } + }`, + entityJSON2: `{ + "device": { + "name": "Device ABC", + "site": {"name": "Site 1"}, + "platform": {"name": "Platform 1"}, + "device_type": { + "manufacturer": {"name": "Cisco"}, + "model": "C2960S" + }, + "role": {"name": "Device Role 1"}, + "tenant": {"name": "Tenant 1"}, + "serial": "1234567890" + } + }`, + shouldMatch: true, + }, + { + name: "complex interface with VLANs and bridge", + entityJSON1: `{ + "interface": { + "name": "GigabitEthernet1/0/1", + "device": { + "name": "Device 1", + "role": {"name": "Device Role 1"}, + "device_type": { + "manufacturer": {"name": "Cisco"}, + "model": "C2960S" + }, + "site": {"name": "Site 1"} + }, + "type": "1000base-t", + "enabled": true, + "mtu": "9000", + "untagged_vlan": { + "vid": 100, + "name": "Data VLAN", + "status": "active" + }, + "tagged_vlans": [ + { + "vid": 101, + "name": "Voice VLAN", + "status": "active" + }, + { + "vid": 102, + "name": "Data VLAN", + "status": "active" + } + ] + } + }`, + entityJSON2: `{ + "interface": { + "name": "GigabitEthernet1/0/1", + "mtu": "9000", + "untagged_vlan": { + "vid": 100, + "name": "Data VLAN", + "status": "active" + }, + "tagged_vlans": [ + { + "vid": 101, + "name": "Voice VLAN", + "status": "active" + }, + { + "vid": 102, + "name": "Data VLAN", + "status": "active" + } + ], + "device": { + "name": "Device 1", + "role": {"name": "Device Role 1"}, + "device_type": { + "manufacturer": {"name": "Cisco"}, + "model": "C2960S" + }, + "site": {"name": "Site 1"} + }, + "type": "1000base-t", + "enabled": true + } + }`, + shouldMatch: true, + }, + { + name: "complex interface with differing nested object order", + entityJSON1: `{ + "interface": { + "name": "GigabitEthernet1/0/1", + "device": { + "name": "Device 1", + "role": {"name": "Device Role 1"}, + "device_type": { + "manufacturer": {"name": "Cisco"}, + "model": "C2960S" + }, + "site": {"name": "Site 1"} + }, + "type": "1000base-t", + "enabled": true, + "mtu": "9000", + "untagged_vlan": { + "vid": 100, + "name": "Data VLAN", + "status": "active" + }, + "tagged_vlans": [ + { + "vid": 101, + "name": "Voice VLAN", + "status": "active" + }, + { + "vid": 102, + "name": "Data VLAN", + "status": "active" + } + ] + } + }`, + entityJSON2: `{ + "interface": { + "name": "GigabitEthernet1/0/1", + "mtu": "9000", + "untagged_vlan": { + "vid": 100, + "name": "Data VLAN", + "status": "active" + }, + "tagged_vlans": [ + { + "vid": 102, + "name": "Data VLAN", + "status": "active" + }, + { + "vid": 101, + "name": "Voice VLAN", + "status": "active" + } + ], + "device": { + "name": "Device 1", + "role": {"name": "Device Role 1"}, + "device_type": { + "manufacturer": {"name": "Cisco"}, + "model": "C2960S" + }, + "site": {"name": "Site 1"} + }, + "type": "1000base-t", + "enabled": true + } + }`, + shouldMatch: false, + }, + { + name: "IP address with differing nested objects", + entityJSON1: `{ + "ip_address": { + "address": "192.168.100.1/24", + "vrf": { + "name": "PROD-VRF", + "rd": "65000:1" + }, + "tenant": {"name": "Tenant 1"}, + "status": "active", + "assigned_object_interface": { + "device": { + "name": "Device 1", + "device_type": { + "manufacturer": {"name": "Cisco"}, + "model": "C2960S" + }, + "role": {"name": "Device Role 1"}, + "site": {"name": "Site 1"} + }, + "name": "GigabitEthernet1/0/1", + "type": "1000base-t" + } + } + }`, + entityJSON2: `{ + "ip_address": { + "address": "192.168.100.1/24", + "vrf": { + "name": "PROD-VRF", + "rd": "65000:1" + }, + "tenant": {"name": "Tenant 1"}, + "status": "active", + "assigned_object_interface": { + "device": { + "name": "Device 1", + "device_type": { + "manufacturer": {"name": "Cisco"}, + "model": "C2960S" + }, + "role": {"name": "Device Role 1"}, + "site": {"name": "Site 2"} + }, + "name": "GigabitEthernet1/0/1", + "type": "1000base-t" + } + } + }`, + shouldMatch: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Generate hash + hash1, err := fingerprinter.GenerateEntityHashFromJSON([]byte(tt.entityJSON1)) + require.NoError(t, err) + + hash2, err := fingerprinter.GenerateEntityHashFromJSON([]byte(tt.entityJSON2)) + require.NoError(t, err) + + // Verify hash properties + assert.Len(t, hash1, 64, "hash1 should be 64 characters (SHA256)") + assert.Len(t, hash2, 64, "hash2 should be 64 characters (SHA256)") + assert.Regexp(t, "^[a-f0-9]{64}$", hash1, "hash1 should be lowercase hex") + assert.Regexp(t, "^[a-f0-9]{64}$", hash2, "hash2 should be lowercase hex") + + if tt.shouldMatch { + assert.Equal(t, hash1, hash2, "hashes should be identical") + } else { + assert.NotEqual(t, hash1, hash2, "hashes should be different") + } + }) + } +} + +func strPtr(s string) *string { + return &s +} diff --git a/diode-server/gen/dbstore/postgres/change_sets.sql.go b/diode-server/gen/dbstore/postgres/change_sets.sql.go index 3b742f54..faa1502d 100644 --- a/diode-server/gen/dbstore/postgres/change_sets.sql.go +++ b/diode-server/gen/dbstore/postgres/change_sets.sql.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.28.0 +// sqlc v1.29.0 // source: change_sets.sql package postgres @@ -105,3 +105,25 @@ func (q *Queries) CreateChangeSet(ctx context.Context, arg CreateChangeSetParams ) return i, err } + +const truncateChangeSets = `-- name: TruncateChangeSets :exec +DELETE FROM change_sets cs1 +WHERE cs1.ingestion_log_id = $1 + AND cs1.id NOT IN ( + SELECT cs2.id + FROM change_sets cs2 + WHERE cs2.ingestion_log_id = $1 + ORDER BY cs2.id DESC + LIMIT $2 + ) +` + +type TruncateChangeSetsParams struct { + IngestionLogID int32 `json:"ingestion_log_id"` + Limit int32 `json:"limit"` +} + +func (q *Queries) TruncateChangeSets(ctx context.Context, arg TruncateChangeSetsParams) error { + _, err := q.db.Exec(ctx, truncateChangeSets, arg.IngestionLogID, arg.Limit) + return err +} diff --git a/diode-server/gen/dbstore/postgres/db.go b/diode-server/gen/dbstore/postgres/db.go index fd4248fe..fbd4f63c 100644 --- a/diode-server/gen/dbstore/postgres/db.go +++ b/diode-server/gen/dbstore/postgres/db.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.28.0 +// sqlc v1.29.0 package postgres diff --git a/diode-server/gen/dbstore/postgres/deviations.sql.go b/diode-server/gen/dbstore/postgres/deviations.sql.go index b21c49f2..d025b2b0 100644 --- a/diode-server/gen/dbstore/postgres/deviations.sql.go +++ b/diode-server/gen/dbstore/postgres/deviations.sql.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.28.0 +// sqlc v1.29.0 // source: deviations.sql package postgres diff --git a/diode-server/gen/dbstore/postgres/ingestion_logs.sql.go b/diode-server/gen/dbstore/postgres/ingestion_logs.sql.go index c4fb640a..f0ae1c35 100644 --- a/diode-server/gen/dbstore/postgres/ingestion_logs.sql.go +++ b/diode-server/gen/dbstore/postgres/ingestion_logs.sql.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.28.0 +// sqlc v1.29.0 // source: ingestion_logs.sql package postgres @@ -45,9 +45,9 @@ func (q *Queries) CountIngestionLogsPerState(ctx context.Context) ([]CountIngest const createIngestionLog = `-- name: CreateIngestionLog :one INSERT INTO ingestion_logs (external_id, object_type, state, request_id, ingestion_ts, source_ts, producer_app_name, - producer_app_version, sdk_name, sdk_version, entity, source_metadata) -VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) -RETURNING id, external_id, object_type, state, request_id, ingestion_ts, source_ts, producer_app_name, producer_app_version, sdk_name, sdk_version, entity, error, source_metadata, created_at, updated_at + producer_app_version, sdk_name, sdk_version, entity, source_metadata, entity_hash) +VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13) +RETURNING id, external_id, object_type, state, request_id, ingestion_ts, source_ts, producer_app_name, producer_app_version, sdk_name, sdk_version, entity, error, source_metadata, created_at, updated_at, entity_hash, last_seen, duplicate_count ` type CreateIngestionLogParams struct { @@ -63,6 +63,7 @@ type CreateIngestionLogParams struct { SdkVersion pgtype.Text `json:"sdk_version"` Entity []byte `json:"entity"` SourceMetadata []byte `json:"source_metadata"` + EntityHash pgtype.Text `json:"entity_hash"` } func (q *Queries) CreateIngestionLog(ctx context.Context, arg CreateIngestionLogParams) (IngestionLog, error) { @@ -79,6 +80,7 @@ func (q *Queries) CreateIngestionLog(ctx context.Context, arg CreateIngestionLog arg.SdkVersion, arg.Entity, arg.SourceMetadata, + arg.EntityHash, ) var i IngestionLog err := row.Scan( @@ -98,12 +100,80 @@ func (q *Queries) CreateIngestionLog(ctx context.Context, arg CreateIngestionLog &i.SourceMetadata, &i.CreatedAt, &i.UpdatedAt, + &i.EntityHash, + &i.LastSeen, + &i.DuplicateCount, ) return i, err } +const findPriorIngestionLogByEntityHash = `-- name: FindPriorIngestionLogByEntityHash :one +WITH latest_change_sets AS ( + SELECT DISTINCT ON (ingestion_log_id) + ingestion_log_id, + branch_id + FROM change_sets + ORDER BY ingestion_log_id, id DESC +) +SELECT il.id, il.external_id, il.object_type, il.state, il.request_id, il.ingestion_ts, il.source_ts, il.producer_app_name, il.producer_app_version, il.sdk_name, il.sdk_version, il.entity, il.error, il.source_metadata, il.created_at, il.updated_at, il.entity_hash, il.last_seen, il.duplicate_count +FROM ingestion_logs il +LEFT JOIN latest_change_sets lcs ON il.id = lcs.ingestion_log_id +WHERE il.entity_hash = $1 + AND ( + ($2::text IS NOT NULL AND lcs.branch_id = $2::text) + OR + ($2::text IS NULL AND lcs.branch_id IS NULL) + ) +ORDER BY il.created_at DESC +LIMIT 1 +` + +type FindPriorIngestionLogByEntityHashParams struct { + EntityHash pgtype.Text `json:"entity_hash"` + BranchID pgtype.Text `json:"branch_id"` +} + +func (q *Queries) FindPriorIngestionLogByEntityHash(ctx context.Context, arg FindPriorIngestionLogByEntityHashParams) (IngestionLog, error) { + row := q.db.QueryRow(ctx, findPriorIngestionLogByEntityHash, arg.EntityHash, arg.BranchID) + var i IngestionLog + err := row.Scan( + &i.ID, + &i.ExternalID, + &i.ObjectType, + &i.State, + &i.RequestID, + &i.IngestionTs, + &i.SourceTs, + &i.ProducerAppName, + &i.ProducerAppVersion, + &i.SdkName, + &i.SdkVersion, + &i.Entity, + &i.Error, + &i.SourceMetadata, + &i.CreatedAt, + &i.UpdatedAt, + &i.EntityHash, + &i.LastSeen, + &i.DuplicateCount, + ) + return i, err +} + +const incrementDuplicateCount = `-- name: IncrementDuplicateCount :exec +UPDATE ingestion_logs +SET duplicate_count = duplicate_count + 1, + last_seen = CURRENT_TIMESTAMP +WHERE id = $1 +` + +func (q *Queries) IncrementDuplicateCount(ctx context.Context, id int32) error { + _, err := q.db.Exec(ctx, incrementDuplicateCount, id) + return err +} + const retrieveIngestionLogByExternalID = `-- name: RetrieveIngestionLogByExternalID :one -SELECT id, external_id, object_type, state, request_id, ingestion_ts, source_ts, producer_app_name, producer_app_version, sdk_name, sdk_version, entity, error, source_metadata, created_at, updated_at +SELECT id, external_id, object_type, state, request_id, ingestion_ts, source_ts, producer_app_name, producer_app_version, sdk_name, sdk_version, entity, error, source_metadata, created_at, updated_at, entity_hash, last_seen, duplicate_count FROM ingestion_logs WHERE external_id = $1 ` @@ -128,12 +198,15 @@ func (q *Queries) RetrieveIngestionLogByExternalID(ctx context.Context, external &i.SourceMetadata, &i.CreatedAt, &i.UpdatedAt, + &i.EntityHash, + &i.LastSeen, + &i.DuplicateCount, ) return i, err } const retrieveIngestionLogs = `-- name: RetrieveIngestionLogs :many -SELECT id, external_id, object_type, state, request_id, ingestion_ts, source_ts, producer_app_name, producer_app_version, sdk_name, sdk_version, entity, error, source_metadata, created_at, updated_at +SELECT id, external_id, object_type, state, request_id, ingestion_ts, source_ts, producer_app_name, producer_app_version, sdk_name, sdk_version, entity, error, source_metadata, created_at, updated_at, entity_hash, last_seen, duplicate_count FROM ingestion_logs WHERE (state = $1 OR $1 IS NULL) AND (object_type = $2 OR $2 IS NULL) @@ -185,6 +258,9 @@ func (q *Queries) RetrieveIngestionLogs(ctx context.Context, arg RetrieveIngesti &i.SourceMetadata, &i.CreatedAt, &i.UpdatedAt, + &i.EntityHash, + &i.LastSeen, + &i.DuplicateCount, ); err != nil { return nil, err } @@ -269,7 +345,7 @@ UPDATE ingestion_logs SET state = $2, error = $3 WHERE id = $1 -RETURNING id, external_id, object_type, state, request_id, ingestion_ts, source_ts, producer_app_name, producer_app_version, sdk_name, sdk_version, entity, error, source_metadata, created_at, updated_at +RETURNING id, external_id, object_type, state, request_id, ingestion_ts, source_ts, producer_app_name, producer_app_version, sdk_name, sdk_version, entity, error, source_metadata, created_at, updated_at, entity_hash, last_seen, duplicate_count ` type UpdateIngestionLogStateWithErrorParams struct { diff --git a/diode-server/gen/dbstore/postgres/types.go b/diode-server/gen/dbstore/postgres/types.go index 64f259d1..eba407f2 100644 --- a/diode-server/gen/dbstore/postgres/types.go +++ b/diode-server/gen/dbstore/postgres/types.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.28.0 +// sqlc v1.29.0 package postgres @@ -55,6 +55,9 @@ type IngestionLog struct { SourceMetadata []byte `json:"source_metadata"` CreatedAt pgtype.Timestamptz `json:"created_at"` UpdatedAt pgtype.Timestamptz `json:"updated_at"` + EntityHash pgtype.Text `json:"entity_hash"` + LastSeen pgtype.Timestamptz `json:"last_seen"` + DuplicateCount int32 `json:"duplicate_count"` } type VDeviation struct { diff --git a/diode-server/go.mod b/diode-server/go.mod index 7358291c..acefdd2d 100644 --- a/diode-server/go.mod +++ b/diode-server/go.mod @@ -65,6 +65,7 @@ require ( github.com/go-ole/go-ole v1.3.0 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/gosimple/unidecode v1.0.1 // indirect + github.com/gowebpki/jcs v1.0.1 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.3 // indirect github.com/hashicorp/go-cleanhttp v0.5.2 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect diff --git a/diode-server/go.sum b/diode-server/go.sum index faae53e7..1c13d579 100644 --- a/diode-server/go.sum +++ b/diode-server/go.sum @@ -87,6 +87,8 @@ github.com/gosimple/slug v1.15.0 h1:wRZHsRrRcs6b0XnxMUBM6WK1U1Vg5B0R7VkIf1Xzobo= github.com/gosimple/slug v1.15.0/go.mod h1:UiRaFH+GEilHstLUmcBgWcI42viBN7mAb818JrYOeFQ= github.com/gosimple/unidecode v1.0.1 h1:hZzFTMMqSswvf0LBJZCZgThIZrpDHFXux9KeGmn6T/o= github.com/gosimple/unidecode v1.0.1/go.mod h1:CP0Cr1Y1kogOtx0bJblKzsVWrqYaqfNOnHzpgWw4Awc= +github.com/gowebpki/jcs v1.0.1 h1:Qjzg8EOkrOTuWP7DqQ1FbYtcpEbeTzUoTN9bptp8FOU= +github.com/gowebpki/jcs v1.0.1/go.mod h1:CID1cNZ+sHp1CCpAR8mPf6QRtagFBgPJE0FCUQ6+BrI= github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.3 h1:5ZPtiqj0JL5oKWmcsq4VMaAW5ukBEgSGXEN89zeH1Jo= github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.3/go.mod h1:ndYquD05frm2vACXE1nsccT4oJzjhw2arTS2cpUD1PI= github.com/hashicorp/go-cleanhttp v0.5.2 h1:035FKYIWjmULyFRBKPs8TBQoi0x6d9G4xc9neXJWAZQ= diff --git a/diode-server/reconciler/ingestion_processor.go b/diode-server/reconciler/ingestion_processor.go index 805822d4..1a8af82a 100644 --- a/diode-server/reconciler/ingestion_processor.go +++ b/diode-server/reconciler/ingestion_processor.go @@ -19,6 +19,7 @@ import ( "github.com/netboxlabs/diode/diode-server/gen/diode/v1/reconcilerpb" "github.com/netboxlabs/diode/diode-server/gen/netbox" "github.com/netboxlabs/diode/diode-server/reconciler/changeset" + "github.com/netboxlabs/diode/diode-server/reconciler/ops" "github.com/netboxlabs/diode/diode-server/sentry" "github.com/netboxlabs/diode/diode-server/telemetry" ) @@ -76,7 +77,7 @@ type IngestionLogToProcess struct { // IngestionProcessorOps represents the basic operations that the ingestion processor performs type IngestionProcessorOps interface { - CreateIngestionLog(ctx context.Context, ingestionLog *reconcilerpb.IngestionLog, sourceMetadata []byte) (*int32, error) + CreateIngestionLog(ctx context.Context, ingestionLog *reconcilerpb.IngestionLog, sourceMetadata []byte) (*ops.CreateIngestionLogResult, error) GenerateChangeSet(ctx context.Context, ingestionLogID int32, ingestionLog *reconcilerpb.IngestionLog, branchID string) (*int32, *changeset.ChangeSet, error) ApplyChangeSet(ctx context.Context, ingestionLogID int32, ingestionLog *reconcilerpb.IngestionLog, changeSetID int32, changeSet *changeset.ChangeSet) error } @@ -387,18 +388,36 @@ func (p *IngestionProcessor) CreateIngestionLogs(ctx context.Context, ingestReq SourceTs: v.GetTimestamp().AsTime().UnixNano(), } - id, err := p.ops.CreateIngestionLog(ctx, ingestionLog, nil) + result, err := p.ops.CreateIngestionLog(ctx, ingestionLog, nil) if err != nil { - errs = append(errs, fmt.Errorf("failed to create ingestion log: %v", err)) + errs = append(errs, fmt.Errorf("failed to process ingestion log: %v", err)) p.metrics.RecordIngestionLogCreate(ctx, false) continue } + ingestionLog = result.IngestionLog + id := result.ID + + if !result.WasDuplicate { + p.logger.Debug("created new ingestion log", "id", id, "externalID", ingestionLog.GetId()) + } else { + p.logger.Debug("ingested duplicate ingestion log", "id", id, "externalID", ingestionLog.GetId()) + } + + attrs := []attribute.KeyValue{ + attribute.Bool(telemetry.AttributeDuplicate, result.WasDuplicate), + } + ctx = telemetry.ContextWithMetricAttributes(ctx, attrs...) p.metrics.RecordIngestionLogCreate(ctx, true) - p.logger.Debug("created ingestion log", "id", id, "externalID", ingestionLog.GetId()) + if result.WasDuplicate && result.IngestionLog.State == reconcilerpb.State_IGNORED { + p.logger.Debug("skipping ingestion log because it is a duplicate of an ignored ingestion log", "id", id, "externalID", ingestionLog.GetId()) + continue + } + + // otherwise, even if it was a duplicate, reprocess to see if it has been updated generateIngestionLogChan <- IngestionLogToProcess{ - ingestionLogID: *id, + ingestionLogID: id, ingestionLog: ingestionLog, } } diff --git a/diode-server/reconciler/ingestion_processor_internal_test.go b/diode-server/reconciler/ingestion_processor_internal_test.go index 397e365b..d9579442 100644 --- a/diode-server/reconciler/ingestion_processor_internal_test.go +++ b/diode-server/reconciler/ingestion_processor_internal_test.go @@ -3,6 +3,7 @@ package reconciler import ( "bytes" "context" + "database/sql" "encoding/json" "errors" "io" @@ -38,6 +39,7 @@ func TestHandleStreamMessage(t *testing.T) { changeSetResponse *netboxdiodeplugin.ChangeSetResult changeSetError error reconcilerError bool + expectTruncate bool expectedError bool }{ { @@ -54,6 +56,7 @@ func TestHandleStreamMessage(t *testing.T) { }, changeSetResponse: &netboxdiodeplugin.ChangeSetResult{}, reconcilerError: false, + expectTruncate: true, expectedError: false, }, { @@ -114,6 +117,7 @@ func TestHandleStreamMessage(t *testing.T) { changeSetResponse: &netboxdiodeplugin.ChangeSetResult{ ID: "cs123", }, + expectTruncate: true, reconcilerError: false, expectedError: false, }, @@ -136,6 +140,7 @@ func TestHandleStreamMessage(t *testing.T) { changeSetResponse: &netboxdiodeplugin.ChangeSetResult{ ID: "cs123", }, + expectTruncate: true, changeSetError: errors.New("apply error"), reconcilerError: false, expectedError: false, @@ -161,7 +166,7 @@ func TestHandleStreamMessage(t *testing.T) { ReconcilerRateLimiterRPS: 20, ReconcilerRateLimiterBurst: 1, }, - ops: NewOps(mockRepository, mockNbClient, logger), + ops: NewOps(mockRepository, mockNbClient, logger, nil), metrics: mockMetrics, } @@ -207,10 +212,16 @@ func TestHandleStreamMessage(t *testing.T) { } mockNbClient.On("ApplyChangeSet", mock.Anything, mock.Anything).Return(tt.changeSetResponse, tt.changeSetError) if tt.entities[0].Entity != nil { - mockRepository.On("CreateIngestionLog", mock.Anything, mock.Anything, mock.Anything).Return(int32Ptr(1), nil) + mockRepository.On("CreateIngestionLog", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(int32Ptr(1), nil) + // Mock FindPriorIngestionLogByEntityHash to return no duplicate found (sql.ErrNoRows) + mockRepository.On("FindPriorIngestionLogByEntityHash", mock.Anything, mock.Anything, mock.Anything).Return(nil, nil, sql.ErrNoRows) mockRepository.On("CreateChangeSet", mock.Anything, mock.Anything, mock.Anything).Return(int32Ptr(1), nil) mockRepository.On("UpdateIngestionLogStateWithError", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) } + if tt.expectTruncate { + mockRepository.On("TruncateChangeSets", mock.Anything, mock.Anything, mock.Anything).Return(nil) + } + mockRedisStreamClient.On("XAck", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(redis.NewIntCmd(ctx)) mockRedisStreamClient.On("XDel", mock.Anything, mock.Anything, mock.Anything).Return(redis.NewIntCmd(ctx)) mockMetrics.On("RecordHandleMessage", mock.Anything, mock.Anything).Return() @@ -369,6 +380,7 @@ func TestIngestionProcessor_GenerateAndApplyChangeSet(t *testing.T) { autoApplyChangesets bool expectedStatus reconcilerpb.State expectedError bool + expectTruncate bool }{ { name: "generate and apply change set", @@ -409,6 +421,7 @@ func TestIngestionProcessor_GenerateAndApplyChangeSet(t *testing.T) { autoApplyChangesets: true, expectedStatus: reconcilerpb.State_APPLIED, expectedError: false, + expectTruncate: true, }, { name: "generate change set only", @@ -446,6 +459,7 @@ func TestIngestionProcessor_GenerateAndApplyChangeSet(t *testing.T) { autoApplyChangesets: false, expectedStatus: reconcilerpb.State_OPEN, expectedError: false, + expectTruncate: true, }, { name: "generate change set without changes", @@ -476,6 +490,7 @@ func TestIngestionProcessor_GenerateAndApplyChangeSet(t *testing.T) { autoApplyChangesets: false, expectedStatus: reconcilerpb.State_NO_CHANGES, expectedError: false, + expectTruncate: true, }, } @@ -496,7 +511,7 @@ func TestIngestionProcessor_GenerateAndApplyChangeSet(t *testing.T) { ReconcilerRateLimiterRPS: 20, ReconcilerRateLimiterBurst: 1, }, - ops: NewOps(mockRepository, mockNbClient, logger), + ops: NewOps(mockRepository, mockNbClient, logger, nil), metrics: mockMetrics, } @@ -509,7 +524,9 @@ func TestIngestionProcessor_GenerateAndApplyChangeSet(t *testing.T) { } mockRepository.On("UpdateIngestionLogStateWithError", ctx, ingestionLogID, tt.expectedStatus, mock.Anything).Return(nil) mockRepository.On("CreateChangeSet", ctx, mock.Anything, ingestionLogID).Return(int32Ptr(1), nil) - + if tt.expectTruncate { + mockRepository.On("TruncateChangeSets", ctx, ingestionLogID, mock.Anything).Return(nil) + } mockMetrics.On("RecordChangeSetCreate", mock.Anything, mock.Anything, mock.Anything).Return() if tt.autoApplyChangesets { mockMetrics.On("RecordChangeSetApply", mock.Anything, mock.Anything, mock.Anything).Return() diff --git a/diode-server/reconciler/ingestion_processor_test.go b/diode-server/reconciler/ingestion_processor_test.go index cd62b3af..724fd9be 100644 --- a/diode-server/reconciler/ingestion_processor_test.go +++ b/diode-server/reconciler/ingestion_processor_test.go @@ -2,6 +2,7 @@ package reconciler_test import ( "context" + "database/sql" "log/slog" "os" "testing" @@ -16,10 +17,13 @@ import ( "google.golang.org/protobuf/proto" "github.com/netboxlabs/diode/diode-server/gen/diode/v1/diodepb" + "github.com/netboxlabs/diode/diode-server/gen/diode/v1/reconcilerpb" "github.com/netboxlabs/diode/diode-server/netboxdiodeplugin" pluginmocks "github.com/netboxlabs/diode/diode-server/netboxdiodeplugin/mocks" "github.com/netboxlabs/diode/diode-server/reconciler" + "github.com/netboxlabs/diode/diode-server/reconciler/changeset" "github.com/netboxlabs/diode/diode-server/reconciler/mocks" + "github.com/netboxlabs/diode/diode-server/reconciler/ops" ) func int32Ptr(i int32) *int32 { return &i } @@ -57,7 +61,8 @@ func TestNewIngestionProcessor(t *testing.T) { _ = redisStreamClient.Close() }() - processor, err := reconciler.NewIngestionProcessor(ctx, logger, cfg, redisClient, redisStreamClient, reconciler.DefaultRedisStreamID, reconciler.DefaultRedisConsumerGroup, reconciler.NewOps(mockRepository, mockNetBoxClient, logger), mockMetrics) + ops := reconciler.NewOps(mockRepository, mockNetBoxClient, logger, nil) + processor, err := reconciler.NewIngestionProcessor(ctx, logger, cfg, redisClient, redisStreamClient, reconciler.DefaultRedisStreamID, reconciler.DefaultRedisConsumerGroup, ops, mockMetrics) require.NoError(t, err) require.NotNil(t, processor) @@ -98,7 +103,8 @@ func TestIngestionProcessorStart(t *testing.T) { _ = redisStreamClient.Close() }() - processor, err := reconciler.NewIngestionProcessor(ctx, logger, cfg, redisClient, redisStreamClient, reconciler.DefaultRedisStreamID, reconciler.DefaultRedisConsumerGroup, reconciler.NewOps(mockRepository, mockNetBoxClient, logger), mockMetrics) + ops := reconciler.NewOps(mockRepository, mockNetBoxClient, logger, nil) + processor, err := reconciler.NewIngestionProcessor(ctx, logger, cfg, redisClient, redisStreamClient, reconciler.DefaultRedisStreamID, reconciler.DefaultRedisConsumerGroup, ops, mockMetrics) require.NoError(t, err) require.NotNil(t, processor) @@ -270,9 +276,11 @@ func TestIngestionProcessorStart(t *testing.T) { // Wait server time.Sleep(50 * time.Millisecond) - mockRepository.On("CreateIngestionLog", mock.Anything, mock.Anything, mock.Anything).Return(int32Ptr(1), nil) + mockRepository.On("CreateIngestionLog", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(int32Ptr(1), nil) + mockRepository.On("FindPriorIngestionLogByEntityHash", mock.Anything, mock.Anything, mock.Anything).Return(nil, nil, sql.ErrNoRows) mockRepository.On("UpdateIngestionLogStateWithError", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) mockRepository.On("CreateChangeSet", mock.Anything, mock.Anything, mock.Anything).Return(int32Ptr(1), nil) + mockRepository.On("TruncateChangeSets", mock.Anything, mock.Anything, mock.Anything).Return(nil) mockNetBoxClient.On("GenerateDiff", mock.Anything, mock.Anything).Return(&netboxdiodeplugin.ChangeSetResult{ ChangeSet: &netboxdiodeplugin.ChangeSet{ @@ -311,12 +319,18 @@ func TestIngestionProcessorStart(t *testing.T) { assert.NoError(t, err) // Wait for the stream to be empty (message processed) + timeout := time.After(2 * time.Second) for { streamLen, err := redisStreamClient.XLen(context.Background(), streamID).Result() assert.NoError(t, err) if streamLen == 0 { break } + select { + case <-timeout: + t.Fatal("timeout waiting for stream to be empty") + default: + } time.Sleep(100 * time.Millisecond) } @@ -326,3 +340,213 @@ func TestIngestionProcessorStart(t *testing.T) { mockRepository.AssertExpectations(t) mockNetBoxClient.AssertExpectations(t) } + +func TestIngestionProcessor_DuplicateHandling(t *testing.T) { + tests := []struct { + name string + existingLogState reconcilerpb.State + stateAfterChangeset reconcilerpb.State + makePrimary bool + expectSkipProcessing bool + changeSetHasChanges bool + createsChangeSet bool + }{ + { + name: "duplicate of IGNORED - skip processing", + existingLogState: reconcilerpb.State_IGNORED, + expectSkipProcessing: true, + }, + { + name: "duplicate of QUEUED - reprocess existing", + existingLogState: reconcilerpb.State_QUEUED, + stateAfterChangeset: reconcilerpb.State_OPEN, + expectSkipProcessing: false, + changeSetHasChanges: true, + createsChangeSet: true, + }, + { + name: "duplicate of OPEN - reprocess existing", + existingLogState: reconcilerpb.State_OPEN, + stateAfterChangeset: reconcilerpb.State_OPEN, + expectSkipProcessing: false, + changeSetHasChanges: true, + createsChangeSet: true, + }, + { + name: "duplicate of FAILED - reprocess existing", + existingLogState: reconcilerpb.State_FAILED, + stateAfterChangeset: reconcilerpb.State_OPEN, + expectSkipProcessing: false, + changeSetHasChanges: true, + createsChangeSet: true, + }, + { + name: "duplicate of APPLIED with changes - reprocess existing", + existingLogState: reconcilerpb.State_APPLIED, + stateAfterChangeset: reconcilerpb.State_OPEN, + expectSkipProcessing: false, + changeSetHasChanges: true, + createsChangeSet: true, + }, + { + name: "duplicate of APPLIED without changes - no reprocessing", + existingLogState: reconcilerpb.State_APPLIED, + stateAfterChangeset: reconcilerpb.State_APPLIED, + expectSkipProcessing: false, + changeSetHasChanges: false, + createsChangeSet: false, + }, + { + name: "duplicate of NO_CHANGES with changes - reprocess existing", + existingLogState: reconcilerpb.State_NO_CHANGES, + stateAfterChangeset: reconcilerpb.State_OPEN, + expectSkipProcessing: false, + changeSetHasChanges: true, + createsChangeSet: true, + }, + { + name: "duplicate of NO_CHANGES without changes - no reprocessing", + existingLogState: reconcilerpb.State_NO_CHANGES, + stateAfterChangeset: reconcilerpb.State_NO_CHANGES, + expectSkipProcessing: false, + changeSetHasChanges: false, + createsChangeSet: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctx := context.Background() + s := miniredis.RunT(t) + s.DB(1) + defer s.Close() + + setupEnv(s.Addr()) + defer teardownEnv() + var cfg reconciler.Config + envconfig.MustProcess("", &cfg) + + logger := slog.New(slog.NewTextHandler(os.Stdout, nil)) + + mockOps := mocks.NewIngestionProcessorOps(t) + mockMetrics := mocks.NewMetrics(t) + + redisClient := redis.NewClient(&redis.Options{Addr: s.Addr(), DB: 0}) + defer func() { + _ = redisClient.Close() + }() + redisStreamClient := redis.NewClient(&redis.Options{Addr: s.Addr(), DB: 1}) + defer func() { + _ = redisStreamClient.Close() + }() + + testEntity := &diodepb.Entity{ + Entity: &diodepb.Entity_Site{ + Site: &diodepb.Site{ + Name: "test-site-name", + }, + }, + } + + existingLogID := int32(100) + + existingLog := &reconcilerpb.IngestionLog{ + Id: "existing-log-id", + ObjectType: "dcim.site", + State: tt.existingLogState, + Entity: testEntity, + } + + duplicateResult := &ops.CreateIngestionLogResult{ + ID: existingLogID, + IngestionLog: existingLog, + WasDuplicate: true, + } + + mockOps.On("CreateIngestionLog", mock.Anything, mock.Anything, mock.Anything).Return(duplicateResult, nil) + + if !tt.expectSkipProcessing { + changeSetLogID := existingLogID + ingestionLogForChangeset := duplicateResult.IngestionLog + + changes := []changeset.Change{} + if tt.changeSetHasChanges { + changes = append(changes, changeset.Change{ + ID: "test-change", + ChangeType: "create", + ObjectType: "dcim.site", + }) + } + + mockChangeSet := &changeset.ChangeSet{ + ID: "changeset-id", + Changes: changes, + } + + mockOps.On("GenerateChangeSet", mock.Anything, changeSetLogID, ingestionLogForChangeset, "").Run(func(_ mock.Arguments) { + ingestionLogForChangeset.State = tt.stateAfterChangeset + }).Return(int32Ptr(1), mockChangeSet, nil) + + mockMetrics.On("RecordChangeSetCreate", mock.Anything, mock.Anything, mock.Anything).Return() + + if tt.stateAfterChangeset == reconcilerpb.State_OPEN { + mockOps.On("ApplyChangeSet", mock.Anything, changeSetLogID, ingestionLogForChangeset, int32(1), mockChangeSet).Return(nil) + mockMetrics.On("RecordChangeSetApply", mock.Anything, mock.Anything, mock.Anything).Return() + } + } + + mockMetrics.On("RecordHandleMessage", mock.Anything, mock.Anything).Return() + mockMetrics.On("RecordIngestionLogCreate", mock.Anything, mock.Anything).Return() + + processor, err := reconciler.NewIngestionProcessor( + ctx, logger, cfg, redisClient, redisStreamClient, + reconciler.DefaultRedisStreamID, reconciler.DefaultRedisConsumerGroup, + mockOps, mockMetrics) + require.NoError(t, err) + + go func() { + err := processor.Start(ctx) + assert.NoError(t, err) + }() + time.Sleep(50 * time.Millisecond) + + ingestReq := &diodepb.IngestRequest{ + Id: "test-request-id", + Entities: []*diodepb.Entity{testEntity}, + } + reqBytes, err := proto.Marshal(ingestReq) + require.NoError(t, err) + + streamID := reconciler.DefaultRedisStreamID + err = redisStreamClient.XAdd(ctx, &redis.XAddArgs{ + Stream: streamID, + Values: []string{ + "request", string(reqBytes), + "ingestion_ts", "1720425600", + }, + }).Err() + require.NoError(t, err) + + timeout := time.After(2 * time.Second) + for { + streamLen, err := redisStreamClient.XLen(ctx, streamID).Result() + require.NoError(t, err) + if streamLen == 0 { + break + } + select { + case <-timeout: + t.Fatal("timeout waiting for stream to be empty") + default: + } + time.Sleep(100 * time.Millisecond) + } + + err = processor.Stop() + require.NoError(t, err) + + mockOps.AssertExpectations(t) + mockMetrics.AssertExpectations(t) + }) + } +} diff --git a/diode-server/reconciler/mocks/createingestionlogresult.go b/diode-server/reconciler/mocks/createingestionlogresult.go new file mode 100644 index 00000000..7a281d02 --- /dev/null +++ b/diode-server/reconciler/mocks/createingestionlogresult.go @@ -0,0 +1,169 @@ +// Code generated by mockery v2.53.4. DO NOT EDIT. + +package mocks + +import mock "github.com/stretchr/testify/mock" + +// CreateIngestionLogResult is an autogenerated mock type for the CreateIngestionLogResult type +type CreateIngestionLogResult struct { + mock.Mock +} + +type CreateIngestionLogResult_Expecter struct { + mock *mock.Mock +} + +func (_m *CreateIngestionLogResult) EXPECT() *CreateIngestionLogResult_Expecter { + return &CreateIngestionLogResult_Expecter{mock: &_m.Mock} +} + +// ID provides a mock function with no fields +func (_m *CreateIngestionLogResult) ID() *int32 { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for ID") + } + + var r0 *int32 + if rf, ok := ret.Get(0).(func() *int32); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*int32) + } + } + + return r0 +} + +// CreateIngestionLogResult_ID_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ID' +type CreateIngestionLogResult_ID_Call struct { + *mock.Call +} + +// ID is a helper method to define mock.On call +func (_e *CreateIngestionLogResult_Expecter) ID() *CreateIngestionLogResult_ID_Call { + return &CreateIngestionLogResult_ID_Call{Call: _e.mock.On("ID")} +} + +func (_c *CreateIngestionLogResult_ID_Call) Run(run func()) *CreateIngestionLogResult_ID_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *CreateIngestionLogResult_ID_Call) Return(_a0 *int32) *CreateIngestionLogResult_ID_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *CreateIngestionLogResult_ID_Call) RunAndReturn(run func() *int32) *CreateIngestionLogResult_ID_Call { + _c.Call.Return(run) + return _c +} + +// ShouldProcess provides a mock function with no fields +func (_m *CreateIngestionLogResult) ShouldProcess() bool { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for ShouldProcess") + } + + var r0 bool + if rf, ok := ret.Get(0).(func() bool); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(bool) + } + + return r0 +} + +// CreateIngestionLogResult_ShouldProcess_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ShouldProcess' +type CreateIngestionLogResult_ShouldProcess_Call struct { + *mock.Call +} + +// ShouldProcess is a helper method to define mock.On call +func (_e *CreateIngestionLogResult_Expecter) ShouldProcess() *CreateIngestionLogResult_ShouldProcess_Call { + return &CreateIngestionLogResult_ShouldProcess_Call{Call: _e.mock.On("ShouldProcess")} +} + +func (_c *CreateIngestionLogResult_ShouldProcess_Call) Run(run func()) *CreateIngestionLogResult_ShouldProcess_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *CreateIngestionLogResult_ShouldProcess_Call) Return(_a0 bool) *CreateIngestionLogResult_ShouldProcess_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *CreateIngestionLogResult_ShouldProcess_Call) RunAndReturn(run func() bool) *CreateIngestionLogResult_ShouldProcess_Call { + _c.Call.Return(run) + return _c +} + +// WasDuplicate provides a mock function with no fields +func (_m *CreateIngestionLogResult) WasDuplicate() bool { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for WasDuplicate") + } + + var r0 bool + if rf, ok := ret.Get(0).(func() bool); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(bool) + } + + return r0 +} + +// CreateIngestionLogResult_WasDuplicate_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'WasDuplicate' +type CreateIngestionLogResult_WasDuplicate_Call struct { + *mock.Call +} + +// WasDuplicate is a helper method to define mock.On call +func (_e *CreateIngestionLogResult_Expecter) WasDuplicate() *CreateIngestionLogResult_WasDuplicate_Call { + return &CreateIngestionLogResult_WasDuplicate_Call{Call: _e.mock.On("WasDuplicate")} +} + +func (_c *CreateIngestionLogResult_WasDuplicate_Call) Run(run func()) *CreateIngestionLogResult_WasDuplicate_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *CreateIngestionLogResult_WasDuplicate_Call) Return(_a0 bool) *CreateIngestionLogResult_WasDuplicate_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *CreateIngestionLogResult_WasDuplicate_Call) RunAndReturn(run func() bool) *CreateIngestionLogResult_WasDuplicate_Call { + _c.Call.Return(run) + return _c +} + +// NewCreateIngestionLogResult creates a new instance of CreateIngestionLogResult. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewCreateIngestionLogResult(t interface { + mock.TestingT + Cleanup(func()) +}) *CreateIngestionLogResult { + mock := &CreateIngestionLogResult{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/diode-server/reconciler/mocks/ingestionprocessorops.go b/diode-server/reconciler/mocks/ingestionprocessorops.go index 8c40c57f..e8152985 100644 --- a/diode-server/reconciler/mocks/ingestionprocessorops.go +++ b/diode-server/reconciler/mocks/ingestionprocessorops.go @@ -9,6 +9,8 @@ import ( mock "github.com/stretchr/testify/mock" + ops "github.com/netboxlabs/diode/diode-server/reconciler/ops" + reconcilerpb "github.com/netboxlabs/diode/diode-server/gen/diode/v1/reconcilerpb" ) @@ -76,23 +78,23 @@ func (_c *IngestionProcessorOps_ApplyChangeSet_Call) RunAndReturn(run func(conte } // CreateIngestionLog provides a mock function with given fields: ctx, ingestionLog, sourceMetadata -func (_m *IngestionProcessorOps) CreateIngestionLog(ctx context.Context, ingestionLog *reconcilerpb.IngestionLog, sourceMetadata []byte) (*int32, error) { +func (_m *IngestionProcessorOps) CreateIngestionLog(ctx context.Context, ingestionLog *reconcilerpb.IngestionLog, sourceMetadata []byte) (*ops.CreateIngestionLogResult, error) { ret := _m.Called(ctx, ingestionLog, sourceMetadata) if len(ret) == 0 { panic("no return value specified for CreateIngestionLog") } - var r0 *int32 + var r0 *ops.CreateIngestionLogResult var r1 error - if rf, ok := ret.Get(0).(func(context.Context, *reconcilerpb.IngestionLog, []byte) (*int32, error)); ok { + if rf, ok := ret.Get(0).(func(context.Context, *reconcilerpb.IngestionLog, []byte) (*ops.CreateIngestionLogResult, error)); ok { return rf(ctx, ingestionLog, sourceMetadata) } - if rf, ok := ret.Get(0).(func(context.Context, *reconcilerpb.IngestionLog, []byte) *int32); ok { + if rf, ok := ret.Get(0).(func(context.Context, *reconcilerpb.IngestionLog, []byte) *ops.CreateIngestionLogResult); ok { r0 = rf(ctx, ingestionLog, sourceMetadata) } else { if ret.Get(0) != nil { - r0 = ret.Get(0).(*int32) + r0 = ret.Get(0).(*ops.CreateIngestionLogResult) } } @@ -125,12 +127,12 @@ func (_c *IngestionProcessorOps_CreateIngestionLog_Call) Run(run func(ctx contex return _c } -func (_c *IngestionProcessorOps_CreateIngestionLog_Call) Return(_a0 *int32, _a1 error) *IngestionProcessorOps_CreateIngestionLog_Call { +func (_c *IngestionProcessorOps_CreateIngestionLog_Call) Return(_a0 *ops.CreateIngestionLogResult, _a1 error) *IngestionProcessorOps_CreateIngestionLog_Call { _c.Call.Return(_a0, _a1) return _c } -func (_c *IngestionProcessorOps_CreateIngestionLog_Call) RunAndReturn(run func(context.Context, *reconcilerpb.IngestionLog, []byte) (*int32, error)) *IngestionProcessorOps_CreateIngestionLog_Call { +func (_c *IngestionProcessorOps_CreateIngestionLog_Call) RunAndReturn(run func(context.Context, *reconcilerpb.IngestionLog, []byte) (*ops.CreateIngestionLogResult, error)) *IngestionProcessorOps_CreateIngestionLog_Call { _c.Call.Return(run) return _c } diff --git a/diode-server/reconciler/mocks/limits.go b/diode-server/reconciler/mocks/limits.go new file mode 100644 index 00000000..857fe8ae --- /dev/null +++ b/diode-server/reconciler/mocks/limits.go @@ -0,0 +1,77 @@ +// Code generated by mockery v2.53.4. DO NOT EDIT. + +package mocks + +import mock "github.com/stretchr/testify/mock" + +// Limits is an autogenerated mock type for the Limits type +type Limits struct { + mock.Mock +} + +type Limits_Expecter struct { + mock *mock.Mock +} + +func (_m *Limits) EXPECT() *Limits_Expecter { + return &Limits_Expecter{mock: &_m.Mock} +} + +// MaxChangeSetsPerIngestionLog provides a mock function with no fields +func (_m *Limits) MaxChangeSetsPerIngestionLog() int32 { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for MaxChangeSetsPerIngestionLog") + } + + var r0 int32 + if rf, ok := ret.Get(0).(func() int32); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(int32) + } + + return r0 +} + +// Limits_MaxChangeSetsPerIngestionLog_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'MaxChangeSetsPerIngestionLog' +type Limits_MaxChangeSetsPerIngestionLog_Call struct { + *mock.Call +} + +// MaxChangeSetsPerIngestionLog is a helper method to define mock.On call +func (_e *Limits_Expecter) MaxChangeSetsPerIngestionLog() *Limits_MaxChangeSetsPerIngestionLog_Call { + return &Limits_MaxChangeSetsPerIngestionLog_Call{Call: _e.mock.On("MaxChangeSetsPerIngestionLog")} +} + +func (_c *Limits_MaxChangeSetsPerIngestionLog_Call) Run(run func()) *Limits_MaxChangeSetsPerIngestionLog_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *Limits_MaxChangeSetsPerIngestionLog_Call) Return(_a0 int32) *Limits_MaxChangeSetsPerIngestionLog_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *Limits_MaxChangeSetsPerIngestionLog_Call) RunAndReturn(run func() int32) *Limits_MaxChangeSetsPerIngestionLog_Call { + _c.Call.Return(run) + return _c +} + +// NewLimits creates a new instance of Limits. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewLimits(t interface { + mock.TestingT + Cleanup(func()) +}) *Limits { + mock := &Limits{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/diode-server/reconciler/mocks/repository.go b/diode-server/reconciler/mocks/repository.go index 8a9520e6..3446ce8f 100644 --- a/diode-server/reconciler/mocks/repository.go +++ b/diode-server/reconciler/mocks/repository.go @@ -143,9 +143,9 @@ func (_c *Repository_CreateChangeSet_Call) RunAndReturn(run func(context.Context return _c } -// CreateIngestionLog provides a mock function with given fields: ctx, ingestionLog, sourceMetadata -func (_m *Repository) CreateIngestionLog(ctx context.Context, ingestionLog *reconcilerpb.IngestionLog, sourceMetadata []byte) (*int32, error) { - ret := _m.Called(ctx, ingestionLog, sourceMetadata) +// CreateIngestionLog provides a mock function with given fields: ctx, ingestionLog, sourceMetadata, entityHash +func (_m *Repository) CreateIngestionLog(ctx context.Context, ingestionLog *reconcilerpb.IngestionLog, sourceMetadata []byte, entityHash string) (*int32, error) { + ret := _m.Called(ctx, ingestionLog, sourceMetadata, entityHash) if len(ret) == 0 { panic("no return value specified for CreateIngestionLog") @@ -153,19 +153,19 @@ func (_m *Repository) CreateIngestionLog(ctx context.Context, ingestionLog *reco var r0 *int32 var r1 error - if rf, ok := ret.Get(0).(func(context.Context, *reconcilerpb.IngestionLog, []byte) (*int32, error)); ok { - return rf(ctx, ingestionLog, sourceMetadata) + if rf, ok := ret.Get(0).(func(context.Context, *reconcilerpb.IngestionLog, []byte, string) (*int32, error)); ok { + return rf(ctx, ingestionLog, sourceMetadata, entityHash) } - if rf, ok := ret.Get(0).(func(context.Context, *reconcilerpb.IngestionLog, []byte) *int32); ok { - r0 = rf(ctx, ingestionLog, sourceMetadata) + if rf, ok := ret.Get(0).(func(context.Context, *reconcilerpb.IngestionLog, []byte, string) *int32); ok { + r0 = rf(ctx, ingestionLog, sourceMetadata, entityHash) } else { if ret.Get(0) != nil { r0 = ret.Get(0).(*int32) } } - if rf, ok := ret.Get(1).(func(context.Context, *reconcilerpb.IngestionLog, []byte) error); ok { - r1 = rf(ctx, ingestionLog, sourceMetadata) + if rf, ok := ret.Get(1).(func(context.Context, *reconcilerpb.IngestionLog, []byte, string) error); ok { + r1 = rf(ctx, ingestionLog, sourceMetadata, entityHash) } else { r1 = ret.Error(1) } @@ -182,13 +182,14 @@ type Repository_CreateIngestionLog_Call struct { // - ctx context.Context // - ingestionLog *reconcilerpb.IngestionLog // - sourceMetadata []byte -func (_e *Repository_Expecter) CreateIngestionLog(ctx interface{}, ingestionLog interface{}, sourceMetadata interface{}) *Repository_CreateIngestionLog_Call { - return &Repository_CreateIngestionLog_Call{Call: _e.mock.On("CreateIngestionLog", ctx, ingestionLog, sourceMetadata)} +// - entityHash string +func (_e *Repository_Expecter) CreateIngestionLog(ctx interface{}, ingestionLog interface{}, sourceMetadata interface{}, entityHash interface{}) *Repository_CreateIngestionLog_Call { + return &Repository_CreateIngestionLog_Call{Call: _e.mock.On("CreateIngestionLog", ctx, ingestionLog, sourceMetadata, entityHash)} } -func (_c *Repository_CreateIngestionLog_Call) Run(run func(ctx context.Context, ingestionLog *reconcilerpb.IngestionLog, sourceMetadata []byte)) *Repository_CreateIngestionLog_Call { +func (_c *Repository_CreateIngestionLog_Call) Run(run func(ctx context.Context, ingestionLog *reconcilerpb.IngestionLog, sourceMetadata []byte, entityHash string)) *Repository_CreateIngestionLog_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(*reconcilerpb.IngestionLog), args[2].([]byte)) + run(args[0].(context.Context), args[1].(*reconcilerpb.IngestionLog), args[2].([]byte), args[3].(string)) }) return _c } @@ -198,7 +199,123 @@ func (_c *Repository_CreateIngestionLog_Call) Return(_a0 *int32, _a1 error) *Rep return _c } -func (_c *Repository_CreateIngestionLog_Call) RunAndReturn(run func(context.Context, *reconcilerpb.IngestionLog, []byte) (*int32, error)) *Repository_CreateIngestionLog_Call { +func (_c *Repository_CreateIngestionLog_Call) RunAndReturn(run func(context.Context, *reconcilerpb.IngestionLog, []byte, string) (*int32, error)) *Repository_CreateIngestionLog_Call { + _c.Call.Return(run) + return _c +} + +// FindPriorIngestionLogByEntityHash provides a mock function with given fields: ctx, entityHash, currentBranch +func (_m *Repository) FindPriorIngestionLogByEntityHash(ctx context.Context, entityHash string, currentBranch *string) (*int32, *reconcilerpb.IngestionLog, error) { + ret := _m.Called(ctx, entityHash, currentBranch) + + if len(ret) == 0 { + panic("no return value specified for FindPriorIngestionLogByEntityHash") + } + + var r0 *int32 + var r1 *reconcilerpb.IngestionLog + var r2 error + if rf, ok := ret.Get(0).(func(context.Context, string, *string) (*int32, *reconcilerpb.IngestionLog, error)); ok { + return rf(ctx, entityHash, currentBranch) + } + if rf, ok := ret.Get(0).(func(context.Context, string, *string) *int32); ok { + r0 = rf(ctx, entityHash, currentBranch) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*int32) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, string, *string) *reconcilerpb.IngestionLog); ok { + r1 = rf(ctx, entityHash, currentBranch) + } else { + if ret.Get(1) != nil { + r1 = ret.Get(1).(*reconcilerpb.IngestionLog) + } + } + + if rf, ok := ret.Get(2).(func(context.Context, string, *string) error); ok { + r2 = rf(ctx, entityHash, currentBranch) + } else { + r2 = ret.Error(2) + } + + return r0, r1, r2 +} + +// Repository_FindPriorIngestionLogByEntityHash_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'FindPriorIngestionLogByEntityHash' +type Repository_FindPriorIngestionLogByEntityHash_Call struct { + *mock.Call +} + +// FindPriorIngestionLogByEntityHash is a helper method to define mock.On call +// - ctx context.Context +// - entityHash string +// - currentBranch *string +func (_e *Repository_Expecter) FindPriorIngestionLogByEntityHash(ctx interface{}, entityHash interface{}, currentBranch interface{}) *Repository_FindPriorIngestionLogByEntityHash_Call { + return &Repository_FindPriorIngestionLogByEntityHash_Call{Call: _e.mock.On("FindPriorIngestionLogByEntityHash", ctx, entityHash, currentBranch)} +} + +func (_c *Repository_FindPriorIngestionLogByEntityHash_Call) Run(run func(ctx context.Context, entityHash string, currentBranch *string)) *Repository_FindPriorIngestionLogByEntityHash_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(string), args[2].(*string)) + }) + return _c +} + +func (_c *Repository_FindPriorIngestionLogByEntityHash_Call) Return(_a0 *int32, _a1 *reconcilerpb.IngestionLog, _a2 error) *Repository_FindPriorIngestionLogByEntityHash_Call { + _c.Call.Return(_a0, _a1, _a2) + return _c +} + +func (_c *Repository_FindPriorIngestionLogByEntityHash_Call) RunAndReturn(run func(context.Context, string, *string) (*int32, *reconcilerpb.IngestionLog, error)) *Repository_FindPriorIngestionLogByEntityHash_Call { + _c.Call.Return(run) + return _c +} + +// IncrementDuplicateCount provides a mock function with given fields: ctx, id +func (_m *Repository) IncrementDuplicateCount(ctx context.Context, id int32) error { + ret := _m.Called(ctx, id) + + if len(ret) == 0 { + panic("no return value specified for IncrementDuplicateCount") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, int32) error); ok { + r0 = rf(ctx, id) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Repository_IncrementDuplicateCount_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'IncrementDuplicateCount' +type Repository_IncrementDuplicateCount_Call struct { + *mock.Call +} + +// IncrementDuplicateCount is a helper method to define mock.On call +// - ctx context.Context +// - id int32 +func (_e *Repository_Expecter) IncrementDuplicateCount(ctx interface{}, id interface{}) *Repository_IncrementDuplicateCount_Call { + return &Repository_IncrementDuplicateCount_Call{Call: _e.mock.On("IncrementDuplicateCount", ctx, id)} +} + +func (_c *Repository_IncrementDuplicateCount_Call) Run(run func(ctx context.Context, id int32)) *Repository_IncrementDuplicateCount_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(int32)) + }) + return _c +} + +func (_c *Repository_IncrementDuplicateCount_Call) Return(_a0 error) *Repository_IncrementDuplicateCount_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *Repository_IncrementDuplicateCount_Call) RunAndReturn(run func(context.Context, int32) error) *Repository_IncrementDuplicateCount_Call { _c.Call.Return(run) return _c } @@ -452,6 +569,54 @@ func (_c *Repository_RetrieveIngestionLogs_Call) RunAndReturn(run func(context.C return _c } +// TruncateChangeSets provides a mock function with given fields: ctx, ingestionLogID, limit +func (_m *Repository) TruncateChangeSets(ctx context.Context, ingestionLogID int32, limit int32) error { + ret := _m.Called(ctx, ingestionLogID, limit) + + if len(ret) == 0 { + panic("no return value specified for TruncateChangeSets") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, int32, int32) error); ok { + r0 = rf(ctx, ingestionLogID, limit) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Repository_TruncateChangeSets_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'TruncateChangeSets' +type Repository_TruncateChangeSets_Call struct { + *mock.Call +} + +// TruncateChangeSets is a helper method to define mock.On call +// - ctx context.Context +// - ingestionLogID int32 +// - limit int32 +func (_e *Repository_Expecter) TruncateChangeSets(ctx interface{}, ingestionLogID interface{}, limit interface{}) *Repository_TruncateChangeSets_Call { + return &Repository_TruncateChangeSets_Call{Call: _e.mock.On("TruncateChangeSets", ctx, ingestionLogID, limit)} +} + +func (_c *Repository_TruncateChangeSets_Call) Run(run func(ctx context.Context, ingestionLogID int32, limit int32)) *Repository_TruncateChangeSets_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(int32), args[2].(int32)) + }) + return _c +} + +func (_c *Repository_TruncateChangeSets_Call) Return(_a0 error) *Repository_TruncateChangeSets_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *Repository_TruncateChangeSets_Call) RunAndReturn(run func(context.Context, int32, int32) error) *Repository_TruncateChangeSets_Call { + _c.Call.Return(run) + return _c +} + // UpdateIngestionLogStateWithError provides a mock function with given fields: ctx, id, state, err func (_m *Repository) UpdateIngestionLogStateWithError(ctx context.Context, id int32, state reconcilerpb.State, err error) error { ret := _m.Called(ctx, id, state, err) diff --git a/diode-server/reconciler/ops.go b/diode-server/reconciler/ops.go index 05548d13..358aac64 100644 --- a/diode-server/reconciler/ops.go +++ b/diode-server/reconciler/ops.go @@ -2,37 +2,97 @@ package reconciler import ( "context" + "database/sql" "errors" + "fmt" "log/slog" + "github.com/netboxlabs/diode/diode-server/entityhash" diodeErrors "github.com/netboxlabs/diode/diode-server/errors" "github.com/netboxlabs/diode/diode-server/gen/diode/v1/reconcilerpb" "github.com/netboxlabs/diode/diode-server/netboxdiodeplugin" "github.com/netboxlabs/diode/diode-server/reconciler/applier" "github.com/netboxlabs/diode/diode-server/reconciler/changeset" "github.com/netboxlabs/diode/diode-server/reconciler/differ" + "github.com/netboxlabs/diode/diode-server/reconciler/ops" "github.com/netboxlabs/diode/diode-server/sentry" ) +// Limits is an interface that provides limits for the reconciler operations to enforce +type Limits interface { + MaxChangeSetsPerIngestionLog() int32 +} + +// DefaultLimits is the default implementation of the Limits interface +type DefaultLimits struct{} + +// MaxChangeSetsPerIngestionLog returns retention limit for change sets per ingestion log +func (l *DefaultLimits) MaxChangeSetsPerIngestionLog() int32 { + return 5 +} + // Ops high level operations performed during ingestion processing type Ops struct { repository Repository nbClient netboxdiodeplugin.NetBoxAPI logger *slog.Logger + limits Limits } // NewOps creates a new Ops -func NewOps(repository Repository, nbClient netboxdiodeplugin.NetBoxAPI, logger *slog.Logger) *Ops { +func NewOps(repository Repository, nbClient netboxdiodeplugin.NetBoxAPI, logger *slog.Logger, limits Limits) *Ops { + if limits == nil { + limits = &DefaultLimits{} + } + return &Ops{ repository: repository, nbClient: nbClient, logger: logger, + limits: limits, } } // CreateIngestionLog creates a record for a newly received ingestion log -func (o *Ops) CreateIngestionLog(ctx context.Context, ingestionLog *reconcilerpb.IngestionLog, sourceMetadata []byte) (*int32, error) { - return o.repository.CreateIngestionLog(ctx, ingestionLog, sourceMetadata) +func (o *Ops) CreateIngestionLog(ctx context.Context, ingestionLog *reconcilerpb.IngestionLog, sourceMetadata []byte) (*ops.CreateIngestionLogResult, error) { + // TODO: this should be in a transaction. + + fingerprinter := entityhash.NewEntityFingerprinter() + entityHash, err := fingerprinter.GenerateEntityHash(ingestionLog.Entity) + if err != nil { + return nil, fmt.Errorf("failed to generate entity hash: %w", err) + } + + existingID, existingLog, err := o.repository.FindPriorIngestionLogByEntityHash(ctx, entityHash, nil) + if err != nil && !errors.Is(err, sql.ErrNoRows) { + return nil, fmt.Errorf("failed to search for prior deviation: %w", err) + } + + if existingID == nil { + id, err := o.repository.CreateIngestionLog(ctx, ingestionLog, sourceMetadata, entityHash) + if err != nil { + return nil, err + } + + result := &ops.CreateIngestionLogResult{ + ID: *id, + IngestionLog: ingestionLog, + } + return result, nil + } + + // It was a duplicate, increment the duplicate count and return the prior ingestion log + if err := o.repository.IncrementDuplicateCount(ctx, *existingID); err != nil { + return nil, fmt.Errorf("failed to mark record as duplicate: %w", err) + } + + result := &ops.CreateIngestionLogResult{ + ID: *existingID, + IngestionLog: existingLog, + WasDuplicate: true, + } + + return result, nil } // GenerateChangeSet creates a change set based on current NetBox state with optional branch @@ -72,22 +132,44 @@ func (o *Ops) GenerateChangeSet(ctx context.Context, ingestionLogID int32, inges return id, cs, err } + // if the change set has no changes and the ingestion log is already in the no changes or applied state, + // we don't record another changeset in the database, we just bump the updated at time. + if len(changeSet.Changes) == 0 && (ingestionLog.State == reconcilerpb.State_NO_CHANGES || ingestionLog.State == reconcilerpb.State_APPLIED) { + if err := o.repository.UpdateIngestionLogStateWithError(ctx, ingestionLogID, ingestionLog.State, nil); err != nil { + o.logger.Error("failed to update ingestion log state (error ignored)", "ingestionLogID", ingestionLogID, "error", err) + } + return nil, changeSet, nil + } + + // TODO: At this point if the prior ingestion log is in the applied state, and we have + // a new set of changes, we could "clone" the ingestion log to open a new "deviation" + // and leave the prior one as applied. This would be more historically accurate / + // less surprising. For now we just re-open the previously applied change set. + // + // If we did create a new one, we would need to communicate that back to the rest + // of the pipeline and also this operation's name would be a bit of misnomer. + // Possibly some refactoring/renaming of the operations (which are meant to + // keep rpc and pipeline behavior in sync) would be warranted. + changeSetID, err := o.repository.CreateChangeSet(ctx, *changeSet, ingestionLogID) if err != nil { return nil, nil, err } - state := reconcilerpb.State_OPEN - if len(changeSet.Changes) == 0 { - state = reconcilerpb.State_NO_CHANGES + maxChangeSets := o.limits.MaxChangeSetsPerIngestionLog() + if maxChangeSets > 0 { + if err := o.repository.TruncateChangeSets(ctx, ingestionLogID, maxChangeSets); err != nil { + o.logger.Error("failed to truncate change sets (error ignored)", "ingestionLogID", ingestionLogID, "error", err) + } } - ingestionLog.State = state - - if err := o.repository.UpdateIngestionLogStateWithError(ctx, ingestionLogID, state, nil); err != nil { - o.logger.Warn("failed to update ingestion log state (error ignored)", "ingestionLogID", ingestionLogID, "error", err) - // TODO(ltucker): This should be in a transaction. Can leave an inconsistent state marked on the ingestion log. - // return nil, err + if len(changeSet.Changes) > 0 { + ingestionLog.State = reconcilerpb.State_OPEN + } else { + ingestionLog.State = reconcilerpb.State_NO_CHANGES + } + if err := o.repository.UpdateIngestionLogStateWithError(ctx, ingestionLogID, ingestionLog.State, nil); err != nil { + o.logger.Error("failed to update ingestion log state (error ignored)", "ingestionLogID", ingestionLogID, "error", err) } o.logger.Debug("change set generated", "id", changeSetID, "externalID", changeSet.ID, "ingestionLogID", ingestionLogID) diff --git a/diode-server/reconciler/ops/types.go b/diode-server/reconciler/ops/types.go new file mode 100644 index 00000000..a7e2252f --- /dev/null +++ b/diode-server/reconciler/ops/types.go @@ -0,0 +1,10 @@ +package ops + +import "github.com/netboxlabs/diode/diode-server/gen/diode/v1/reconcilerpb" + +// CreateIngestionLogResult represents the result of creating an ingestion log. +type CreateIngestionLogResult struct { + ID int32 + IngestionLog *reconcilerpb.IngestionLog + WasDuplicate bool // true if the ingestion log was a duplicate, in this case the prior ingestion log is returned +} diff --git a/diode-server/reconciler/ops_test.go b/diode-server/reconciler/ops_test.go index 069903e5..2d9483f4 100644 --- a/diode-server/reconciler/ops_test.go +++ b/diode-server/reconciler/ops_test.go @@ -2,6 +2,7 @@ package reconciler_test import ( "context" + "database/sql" "encoding/json" "fmt" "log/slog" @@ -138,7 +139,7 @@ func TestOpsGenerateChangeSet(t *testing.T) { t.Run(tt.name, func(t *testing.T) { mockRepository := mocks.NewRepository(t) mockNetBoxClient := pluginmocks.NewNetBoxAPI(t) - ops := reconciler.NewOps(mockRepository, mockNetBoxClient, logger) + ops := reconciler.NewOps(mockRepository, mockNetBoxClient, logger, nil) for _, m := range tt.generateDiff { if m.err == nil { @@ -204,3 +205,134 @@ func strPtrEq(a *string, b *string) bool { func strPtr(s string) *string { return &s } + +func TestOpsCreateIngestionLog(t *testing.T) { + ctx := context.Background() + logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug, AddSource: false})) + + testEntity := &diodepb.Entity{ + Entity: &diodepb.Entity_Site{ + Site: &diodepb.Site{ + Name: "test-site-1", + }, + }, + } + + testIngestionLog := &pb.IngestionLog{ + Id: "8a8ae517-85b9-466e-890c-aadb0771cc9e", + ObjectType: netbox.SiteObjectType, + State: pb.State_QUEUED, + RequestId: "1abf059c-496f-4037-83c2-0e9b1d021e85", + Entity: testEntity, + } + + testSourceMetadata := []byte(`{"source": "test"}`) + + type mockCreateIngestionLog struct { + id *int32 + error error + } + + tests := []struct { + name string + + ingestionLog *pb.IngestionLog + sourceMetadata []byte + + // Mock expectations + mockCreateIngestionLog *mockCreateIngestionLog + + mockFindPriorIngestionLogID *int32 + mockFindPriorIngestionLog *pb.IngestionLog + mockFindPriorIngestionLogError error + + expectedError string + expectWasDuplicate bool + }{ + { + name: "no duplicate found - successful creation", + ingestionLog: testIngestionLog, + sourceMetadata: testSourceMetadata, + + mockCreateIngestionLog: &mockCreateIngestionLog{ + id: int32Ptr(1234), + error: nil, + }, + mockFindPriorIngestionLogError: sql.ErrNoRows, + + expectedError: "", + expectWasDuplicate: false, + }, + { + name: "duplicate found", + ingestionLog: testIngestionLog, + sourceMetadata: testSourceMetadata, + + mockFindPriorIngestionLogID: int32Ptr(5678), + mockFindPriorIngestionLog: testIngestionLog, + mockFindPriorIngestionLogError: nil, + + expectedError: "", + expectWasDuplicate: true, + }, + { + name: "create ingestion log fails", + ingestionLog: testIngestionLog, + sourceMetadata: testSourceMetadata, + + mockCreateIngestionLog: &mockCreateIngestionLog{ + error: fmt.Errorf("database error"), + }, + + expectedError: "database error", + expectWasDuplicate: false, + }, + { + name: "duplicate search fails with non-NoRows error", + ingestionLog: testIngestionLog, + sourceMetadata: testSourceMetadata, + + mockFindPriorIngestionLogError: fmt.Errorf("database connection error"), + + expectedError: "failed to search for prior deviation: database connection error", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mockRepository := mocks.NewRepository(t) + mockNetBoxClient := pluginmocks.NewNetBoxAPI(t) + opsInstance := reconciler.NewOps(mockRepository, mockNetBoxClient, logger, nil) + + mockRepository.EXPECT().FindPriorIngestionLogByEntityHash(mock.Anything, mock.AnythingOfType("string"), (*string)(nil)). + Return(tt.mockFindPriorIngestionLogID, tt.mockFindPriorIngestionLog, tt.mockFindPriorIngestionLogError) + + // Mock CreateIngestionLog + if tt.mockCreateIngestionLog != nil { + mockRepository.EXPECT().CreateIngestionLog(mock.Anything, tt.ingestionLog, tt.sourceMetadata, mock.AnythingOfType("string")). + Return(tt.mockCreateIngestionLog.id, tt.mockCreateIngestionLog.error) + } + + if tt.expectWasDuplicate { + mockRepository.EXPECT().IncrementDuplicateCount(mock.Anything, *tt.mockFindPriorIngestionLogID).Return(nil) + } + + result, err := opsInstance.CreateIngestionLog(ctx, tt.ingestionLog, tt.sourceMetadata) + + if tt.expectedError != "" { + require.Error(t, err) + require.Contains(t, err.Error(), tt.expectedError) + return + } + + require.NoError(t, err) + require.NotNil(t, result) + require.NotNil(t, result.IngestionLog) + if tt.mockCreateIngestionLog != nil { + require.Equal(t, *tt.mockCreateIngestionLog.id, result.ID) + } + require.Equal(t, tt.ingestionLog, result.IngestionLog) + require.Equal(t, tt.expectWasDuplicate, result.WasDuplicate) + }) + } +} diff --git a/diode-server/reconciler/repository.go b/diode-server/reconciler/repository.go index dd4b1d27..1825f633 100644 --- a/diode-server/reconciler/repository.go +++ b/diode-server/reconciler/repository.go @@ -9,7 +9,7 @@ import ( // Repository is an interface for interacting with ingestion logs and change sets. type Repository interface { - CreateIngestionLog(ctx context.Context, ingestionLog *reconcilerpb.IngestionLog, sourceMetadata []byte) (*int32, error) + CreateIngestionLog(ctx context.Context, ingestionLog *reconcilerpb.IngestionLog, sourceMetadata []byte, entityHash string) (*int32, error) UpdateIngestionLogStateWithError(ctx context.Context, id int32, state reconcilerpb.State, err error) error RetrieveIngestionLogByExternalID(ctx context.Context, uuid string) (*int32, *reconcilerpb.IngestionLog, error) RetrieveIngestionLogs(ctx context.Context, filter *reconcilerpb.RetrieveIngestionLogsRequest, limit int32, offset int32) ([]*reconcilerpb.IngestionLog, error) @@ -17,4 +17,8 @@ type Repository interface { CreateChangeSet(ctx context.Context, changeSet changeset.ChangeSet, ingestionLogID int32) (*int32, error) RetrieveDeviations(ctx context.Context, filter *reconcilerpb.RetrieveDeviationsRequest, limit int32, offset int32) ([]*reconcilerpb.Deviation, error) RetrieveDeviationByID(ctx context.Context, externalID string) (*reconcilerpb.Deviation, error) + + FindPriorIngestionLogByEntityHash(ctx context.Context, entityHash string, currentBranch *string) (*int32, *reconcilerpb.IngestionLog, error) + IncrementDuplicateCount(ctx context.Context, id int32) error + TruncateChangeSets(ctx context.Context, ingestionLogID int32, limit int32) error } diff --git a/diode-server/telemetry/constants.go b/diode-server/telemetry/constants.go index c1db91ab..188e5d16 100644 --- a/diode-server/telemetry/constants.go +++ b/diode-server/telemetry/constants.go @@ -17,4 +17,6 @@ const ( AttributeStream = "stream" // AttributeState is the state of the request AttributeState = "state" + // AttributeDuplicate is a boolean attribute that indicates if an ingestion log was a duplicate + AttributeDuplicate = "duplicate" )