Skip to content

WIP feat: basic deduplication of ingested entities #383

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 4 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions diode-proto/diode/v1/reconciler.proto
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ enum State {
NO_CHANGES = 5;
IGNORED = 6;
ERRORED = 7;
DUPLICATE = 8;
}

// Ingestion metrics
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
-- +goose Up

ALTER TABLE ingestion_logs
ADD COLUMN entity_hash VARCHAR(64),
ADD COLUMN duplicate_of_id INTEGER REFERENCES ingestion_logs(id),
ADD COLUMN last_seen TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
ADD COLUMN duplicate_count INTEGER DEFAULT 0 NOT NULL;

CREATE INDEX idx_ingestion_logs_entity_hash_primary ON ingestion_logs(entity_hash) WHERE duplicate_of_id IS NULL;
CREATE INDEX idx_ingestion_logs_duplicate_of_id ON ingestion_logs(duplicate_of_id);

-- Must be recreated after adding duplicate_of_id column (not stored as query in pg)
-- It also must not alter the order of the columns in the view in order to be
-- performed as a CREATE OR REPLACE (vs a DROP and re-CREATE)
CREATE OR REPLACE VIEW v_deviations AS
SELECT DISTINCT ON (ingestion_logs.id)
ingestion_logs.id,
ingestion_logs.external_id,
ingestion_logs.object_type,
ingestion_logs.state,
ingestion_logs.request_id,
ingestion_logs.ingestion_ts,
ingestion_logs.source_ts,
ingestion_logs.producer_app_name,
ingestion_logs.producer_app_version,
ingestion_logs.sdk_name,
ingestion_logs.sdk_version,
ingestion_logs.entity,
ingestion_logs.error,
ingestion_logs.source_metadata,
ingestion_logs.created_at,
ingestion_logs.updated_at,
row_to_json(change_sets.*) AS change_set,
JSON_AGG(changes.* ORDER BY changes.sequence_number ASC)
FILTER ( WHERE changes.id IS NOT NULL ) AS changes,
ingestion_logs.duplicate_of_id,
ingestion_logs.last_seen,
ingestion_logs.duplicate_count
FROM ingestion_logs
LEFT JOIN change_sets on ingestion_logs.id = change_sets.ingestion_log_id
LEFT JOIN changes on change_sets.id = changes.change_set_id
GROUP BY ingestion_logs.id, change_sets.id
ORDER BY ingestion_logs.id DESC, change_sets.id DESC;

-- Create trigger function to maintain last_seen and duplicate_count
-- +goose StatementBegin
CREATE OR REPLACE FUNCTION update_ingestion_logs_duplicate_stats()
RETURNS TRIGGER AS $$
DECLARE
primary_id INTEGER;
latest_created_at TIMESTAMP WITH TIME ZONE;
dup_count INTEGER;
BEGIN
-- Skip if this is a recursive trigger call (avoid infinite recursion)
-- We detect this by checking if we're only updating last_seen/duplicate_count
IF TG_OP = 'UPDATE' AND
OLD.duplicate_of_id IS NOT DISTINCT FROM NEW.duplicate_of_id AND
OLD.created_at IS NOT DISTINCT FROM NEW.created_at AND
OLD.entity_hash IS NOT DISTINCT FROM NEW.entity_hash THEN
RETURN NEW;
END IF;

-- Determine the primary record ID
IF TG_OP = 'INSERT' THEN
primary_id := COALESCE(NEW.duplicate_of_id, NEW.id);
ELSIF TG_OP = 'UPDATE' THEN
primary_id := COALESCE(NEW.duplicate_of_id, NEW.id);
END IF;

-- Calculate latest created_at and duplicate count for the primary record
SELECT
MAX(created_at),
COUNT(*) - 1 -- Subtract 1 to exclude the primary record itself
INTO latest_created_at, dup_count
FROM ingestion_logs
WHERE id = primary_id OR duplicate_of_id = primary_id;

-- Update only the primary record with duplicate stats
-- This will trigger the UPDATE trigger again, but the recursion check above will prevent infinite loops
UPDATE ingestion_logs
SET
last_seen = latest_created_at,
duplicate_count = dup_count
WHERE id = primary_id;

RETURN COALESCE(NEW, OLD);
END;
$$ LANGUAGE plpgsql;
-- +goose StatementEnd

-- Create triggers
CREATE TRIGGER trg_ingestion_logs_duplicate_stats_insert
AFTER INSERT ON ingestion_logs
FOR EACH ROW
EXECUTE FUNCTION update_ingestion_logs_duplicate_stats();

CREATE TRIGGER trg_ingestion_logs_duplicate_stats_update
AFTER UPDATE ON ingestion_logs
FOR EACH ROW
EXECUTE FUNCTION update_ingestion_logs_duplicate_stats();

-- Initialize existing data: set last_seen to created_at for all records
UPDATE ingestion_logs SET last_seen = created_at WHERE last_seen IS NULL;


-- +goose Down

DROP VIEW IF EXISTS v_deviations;

DROP TRIGGER IF EXISTS trg_ingestion_logs_duplicate_stats_update ON ingestion_logs;
DROP TRIGGER IF EXISTS trg_ingestion_logs_duplicate_stats_insert ON ingestion_logs;
DROP FUNCTION IF EXISTS update_ingestion_logs_duplicate_stats();

DROP INDEX IF EXISTS idx_ingestion_logs_entity_hash_primary;
DROP INDEX IF EXISTS idx_ingestion_logs_duplicate_of_id;

ALTER TABLE ingestion_logs
DROP COLUMN IF EXISTS entity_hash,
DROP COLUMN IF EXISTS duplicate_of_id,
DROP COLUMN IF EXISTS last_seen,
DROP COLUMN IF EXISTS duplicate_count;

-- Rereate a view returning deviations without the duplicate_of_id column
CREATE VIEW v_deviations AS
SELECT DISTINCT ON (ingestion_logs.id) ingestion_logs.*,
row_to_json(change_sets.*) AS change_set,
JSON_AGG(changes.* ORDER BY changes.sequence_number ASC)
FILTER ( WHERE changes.id IS NOT NULL ) AS changes
FROM ingestion_logs
LEFT JOIN change_sets on ingestion_logs.id = change_sets.ingestion_log_id
LEFT JOIN changes on change_sets.id = changes.change_set_id
GROUP BY ingestion_logs.id, change_sets.id
ORDER BY ingestion_logs.id DESC, change_sets.id DESC;
39 changes: 37 additions & 2 deletions diode-server/dbstore/postgres/queries/ingestion_logs.sql
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
-- name: CreateIngestionLog :one
INSERT INTO ingestion_logs (external_id, object_type, state, request_id, ingestion_ts, source_ts, producer_app_name,
producer_app_version, sdk_name, sdk_version, entity, source_metadata)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)
producer_app_version, sdk_name, sdk_version, entity, source_metadata, entity_hash)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)
RETURNING *;

-- name: UpdateIngestionLogStateWithError :exec
Expand All @@ -14,6 +14,7 @@ RETURNING *;
-- name: CountIngestionLogsPerState :many
SELECT state, COUNT(*) AS count
FROM ingestion_logs
WHERE (duplicate_of_id IS NULL OR sqlc.narg('include_duplicates')::boolean = true)
GROUP BY state;

-- name: RetrieveIngestionLogs :many
Expand All @@ -23,6 +24,7 @@ WHERE (state = sqlc.narg('state') OR sqlc.narg('state') IS NULL)
AND (object_type = sqlc.narg('object_type') OR sqlc.narg('object_type') IS NULL)
AND (ingestion_ts >= sqlc.narg('ingestion_ts_start') OR sqlc.narg('ingestion_ts_start') IS NULL)
AND (ingestion_ts <= sqlc.narg('ingestion_ts_end') OR sqlc.narg('ingestion_ts_end') IS NULL)
AND (duplicate_of_id IS NULL OR sqlc.narg('include_duplicates')::boolean = true)
ORDER BY id DESC
LIMIT sqlc.arg('limit') OFFSET sqlc.arg('offset');

Expand All @@ -40,5 +42,38 @@ WHERE (v_deviations.state = sqlc.narg('state') OR sqlc.narg('state') IS NULL)
sqlc.narg('ingestion_ts_start') IS NULL)
AND (v_deviations.ingestion_ts <= sqlc.narg('ingestion_ts_end') OR
sqlc.narg('ingestion_ts_end') IS NULL)
AND (v_deviations.duplicate_of_id IS NULL OR sqlc.narg('include_duplicates')::boolean = true)
ORDER BY v_deviations.id DESC
LIMIT sqlc.arg('limit') OFFSET sqlc.arg('offset');

-- name: FindPriorIngestionLogByEntityHash :one
WITH latest_change_sets AS (
SELECT DISTINCT ON (ingestion_log_id)
ingestion_log_id,
branch_id
FROM change_sets
ORDER BY ingestion_log_id, id DESC
)
SELECT il.*
FROM ingestion_logs il
LEFT JOIN latest_change_sets lcs ON il.id = lcs.ingestion_log_id
WHERE il.entity_hash = sqlc.arg('entity_hash')
AND il.duplicate_of_id IS NULL
AND (
(sqlc.narg('branch_id')::text IS NOT NULL AND lcs.branch_id = sqlc.narg('branch_id')::text)
OR
(sqlc.narg('branch_id')::text IS NULL AND lcs.branch_id IS NULL)
)
ORDER BY il.created_at DESC
LIMIT 1;

-- name: SetIngestionLogDuplicateOfID :exec
UPDATE ingestion_logs
SET duplicate_of_id = $2
WHERE id = $1;

-- name: RetrieveIngestionLogDuplicateOfID :one
SELECT duplicate_of_id
FROM ingestion_logs
WHERE id = $1;

68 changes: 64 additions & 4 deletions diode-server/dbstore/postgres/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,16 @@ func NewRepository(pool *pgxpool.Pool) *Repository {
}
}

// CreateIngestionLog creates a new ingestion log.
func (r *Repository) CreateIngestionLog(ctx context.Context, ingestionLog *reconcilerpb.IngestionLog, sourceMetadata []byte) (*int32, error) {
// CreateIngestionLog creates a new ingestion log with entity hash and deduplication fields.
func (r *Repository) CreateIngestionLog(ctx context.Context, ingestionLog *reconcilerpb.IngestionLog, sourceMetadata []byte, entityHash string) (*int32, error) {
marshaler := protojson.MarshalOptions{
UseProtoNames: true,
}
entityJSON, err := marshaler.Marshal(ingestionLog.Entity)
if err != nil {
return nil, fmt.Errorf("failed to marshal entity: %w", err)
}

params := postgres.CreateIngestionLogParams{
ExternalID: ingestionLog.Id,
ObjectType: pgtype.Text{String: ingestionLog.ObjectType, Valid: true},
Expand All @@ -51,6 +52,7 @@ func (r *Repository) CreateIngestionLog(ctx context.Context, ingestionLog *recon
SdkVersion: pgtype.Text{String: ingestionLog.SdkVersion, Valid: true},
Entity: entityJSON,
SourceMetadata: sourceMetadata,
EntityHash: pgtype.Text{String: entityHash, Valid: true},
}

createdIngestionLog, err := r.queries.CreateIngestionLog(ctx, params)
Expand All @@ -70,6 +72,7 @@ func (r *Repository) RetrieveIngestionLogByExternalID(ctx context.Context, uuid
if err != nil {
return nil, nil, err
}

return &ingestionLog.ID, log, nil
}

Expand All @@ -91,8 +94,8 @@ func (r *Repository) UpdateIngestionLogStateWithError(ctx context.Context, id in
}

// CountIngestionLogsPerState counts ingestion logs per state.
func (r *Repository) CountIngestionLogsPerState(ctx context.Context) (map[reconcilerpb.State]int32, error) {
counts, err := r.queries.CountIngestionLogsPerState(ctx)
func (r *Repository) CountIngestionLogsPerState(ctx context.Context, includeDuplicates bool) (map[reconcilerpb.State]int32, error) {
counts, err := r.queries.CountIngestionLogsPerState(ctx, pgtype.Bool{Bool: includeDuplicates, Valid: true})
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -434,3 +437,60 @@ func deviationToProto(dbDeviation postgres.VDeviation) (*reconcilerpb.Deviation,

return deviation, nil
}

// FindPriorIngestionLogByEntityHash finds a prior deviation with the same entity hash, considering branch context.
func (r *Repository) FindPriorIngestionLogByEntityHash(ctx context.Context, entityHash string, currentBranch *string) (*int32, *reconcilerpb.IngestionLog, error) {
params := postgres.FindPriorIngestionLogByEntityHashParams{
EntityHash: pgtype.Text{String: entityHash, Valid: true},
}
if currentBranch != nil {
params.BranchID = pgtype.Text{String: *currentBranch, Valid: true}
}

dbLog, err := r.queries.FindPriorIngestionLogByEntityHash(ctx, params)
if err != nil {
return nil, nil, err
}

log, err := dbLog.ToProto()
if err != nil {
return nil, nil, fmt.Errorf("failed to convert to proto: %w", err)
}

return &dbLog.ID, log, nil
}

// MarkIngestionLogAsDuplicate marks an ingestion log as a duplicate of another.
func (r *Repository) MarkIngestionLogAsDuplicate(ctx context.Context, duplicateID int32, primaryID int32) error {
params := postgres.SetIngestionLogDuplicateOfIDParams{
ID: duplicateID,
DuplicateOfID: pgtype.Int4{Int32: primaryID, Valid: true},
}
return r.queries.SetIngestionLogDuplicateOfID(ctx, params)
}

// MarkIngestionLogAsPrimary promotes a duplicate to a new primary log.
func (r *Repository) MarkIngestionLogAsPrimary(ctx context.Context, duplicateID int32) error {
// set duplicate_of_id to NULL
params := postgres.SetIngestionLogDuplicateOfIDParams{
ID: duplicateID,
DuplicateOfID: pgtype.Int4{},
}
return r.queries.SetIngestionLogDuplicateOfID(ctx, params)
}

// RetrieveIngestionLogDuplicateOfID retrieves the duplicate ID for an ingestion log.
// duplicate_of_id is nil if the ingestion log is a primary log.
// duplicate_of_id is the primary ID if the ingestion log is a duplicate.
func (r *Repository) RetrieveIngestionLogDuplicateOfID(ctx context.Context, id int32) (*int32, error) {
primaryID, err := r.queries.RetrieveIngestionLogDuplicateOfID(ctx, id)
if err != nil {
return nil, err
}

if primaryID.Valid {
return &primaryID.Int32, nil
}

return nil, nil
}
63 changes: 63 additions & 0 deletions diode-server/entityhash/entityhash.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package entityhash

import (
"crypto/sha256"
"encoding/hex"
"fmt"

"github.com/gowebpki/jcs"
"google.golang.org/protobuf/encoding/protojson"

diodepb "github.com/netboxlabs/diode/diode-server/gen/diode/v1/diodepb"
)

// EntityFingerprinter provides methods for generating consistent hashes of entities
type EntityFingerprinter struct {
marshaler protojson.MarshalOptions
}

// NewEntityFingerprinter creates a new entity fingerprinter with consistent options
func NewEntityFingerprinter() *EntityFingerprinter {
return &EntityFingerprinter{
marshaler: protojson.MarshalOptions{
UseProtoNames: true, // Use snake_case field names for consistency
EmitUnpopulated: false, // Don't include zero/empty values
},
}
}

// GenerateEntityHash creates a SHA256 hash for an entity that includes the object type
// and canonicalized entity content, excluding the timestamp from the outer envelope
func (f *EntityFingerprinter) GenerateEntityHash(entity *diodepb.Entity) (string, error) {
if entity == nil {
return "", fmt.Errorf("entity cannot be nil")
}

// Extract the inner entity content (excluding timestamp)
entityContent := entity.GetEntity()
if entityContent == nil {
return "", fmt.Errorf("entity content cannot be nil")
}

// Serialize the inner entity content to JSON (excluding timestamp)
entityJSON, err := f.marshaler.Marshal(&diodepb.Entity{Entity: entityContent})
if err != nil {
return "", fmt.Errorf("failed to marshal entity: %w", err)
}

return f.GenerateEntityHashFromJSON(entityJSON)
}

// GenerateEntityHashFromJSON creates a SHA256 hash for an entity.
// This should already exclude the timestamp from the entity envelope.
func (f *EntityFingerprinter) GenerateEntityHashFromJSON(entityJSON []byte) (string, error) {
// Canonicalize the JSON using RFC 8785 JCS
canonicalJSON, err := jcs.Transform(entityJSON)
if err != nil {
return "", fmt.Errorf("failed to canonicalize JSON: %w", err)
}

// Generate SHA256 hash
hash := sha256.Sum256(canonicalJSON)
return hex.EncodeToString(hash[:]), nil
}
Loading
Loading