diff --git a/pkg/cantonsdk/streaming/builder.go b/pkg/cantonsdk/streaming/builder.go new file mode 100644 index 00000000..d1421371 --- /dev/null +++ b/pkg/cantonsdk/streaming/builder.go @@ -0,0 +1,68 @@ +package streaming + +import ( + "time" + + lapiv2 "github.com/chainsafe/canton-middleware/pkg/cantonsdk/lapi/v2" +) + +// FieldValue is an opaque DAML value used to construct LedgerEvents. +// Use the Make* functions below to create values of each DAML type. +// This keeps callers free of any direct lapiv2 dependency. +type FieldValue struct{ v *lapiv2.Value } + +// MakeTextField wraps a Go string as a DAML Text value. +func MakeTextField(s string) FieldValue { + return FieldValue{&lapiv2.Value{Sum: &lapiv2.Value_Text{Text: s}}} +} + +// MakePartyField wraps a party ID string as a DAML Party value. +func MakePartyField(s string) FieldValue { + return FieldValue{&lapiv2.Value{Sum: &lapiv2.Value_Party{Party: s}}} +} + +// MakeNumericField wraps a decimal string as a DAML Numeric value. +func MakeNumericField(s string) FieldValue { + return FieldValue{&lapiv2.Value{Sum: &lapiv2.Value_Numeric{Numeric: s}}} +} + +// MakeTimestampField wraps a time.Time as a DAML Timestamp value. +func MakeTimestampField(t time.Time) FieldValue { + return FieldValue{&lapiv2.Value{Sum: &lapiv2.Value_Timestamp{Timestamp: t.UnixMicro()}}} +} + +// MakeNoneField returns a DAML Optional None value. +func MakeNoneField() FieldValue { + return FieldValue{&lapiv2.Value{Sum: &lapiv2.Value_Optional{Optional: &lapiv2.Optional{}}}} +} + +// MakeSomePartyField returns a DAML Optional(Party) Some value. +func MakeSomePartyField(party string) FieldValue { + return FieldValue{&lapiv2.Value{Sum: &lapiv2.Value_Optional{Optional: &lapiv2.Optional{ + Value: &lapiv2.Value{Sum: &lapiv2.Value_Party{Party: party}}, + }}}} +} + +// MakeRecordField builds a DAML Record value from a map of sub-fields. +func MakeRecordField(fields map[string]FieldValue) FieldValue { + rf := make([]*lapiv2.RecordField, 0, len(fields)) + for k, v := range fields { + rf = append(rf, &lapiv2.RecordField{Label: k, Value: v.v}) + } + return FieldValue{&lapiv2.Value{Sum: &lapiv2.Value_Record{Record: &lapiv2.Record{Fields: rf}}}} +} + +// MakeSomeRecordField wraps a record in a DAML Optional(Record) Some value. +func MakeSomeRecordField(fields map[string]FieldValue) FieldValue { + inner := MakeRecordField(fields) + return FieldValue{&lapiv2.Value{Sum: &lapiv2.Value_Optional{Optional: &lapiv2.Optional{Value: inner.v}}}} +} + +// MakeTextMapField builds a DAML TextMap value from a Go string map. +func MakeTextMapField(entries map[string]string) FieldValue { + es := make([]*lapiv2.TextMap_Entry, 0, len(entries)) + for k, v := range entries { + es = append(es, &lapiv2.TextMap_Entry{Key: k, Value: &lapiv2.Value{Sum: &lapiv2.Value_Text{Text: v}}}) + } + return FieldValue{&lapiv2.Value{Sum: &lapiv2.Value_TextMap{TextMap: &lapiv2.TextMap{Entries: es}}}} +} diff --git a/pkg/cantonsdk/streaming/client.go b/pkg/cantonsdk/streaming/client.go new file mode 100644 index 00000000..7f2f0a21 --- /dev/null +++ b/pkg/cantonsdk/streaming/client.go @@ -0,0 +1,262 @@ +package streaming + +import ( + "context" + "errors" + "fmt" + "io" + "sync/atomic" + "time" + + lapiv2 "github.com/chainsafe/canton-middleware/pkg/cantonsdk/lapi/v2" + "github.com/chainsafe/canton-middleware/pkg/cantonsdk/ledger" + "github.com/chainsafe/canton-middleware/pkg/cantonsdk/values" + + "go.uber.org/zap" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +const ( + reconnectBaseDelay = 5 * time.Second + reconnectMaxDelay = 60 * time.Second + txChannelCap = 100 +) + +// Streamer is the interface for opening a live Canton ledger stream. +// *Client satisfies this interface. +type Streamer interface { + Subscribe(ctx context.Context, req SubscribeRequest, lastOffset *int64) <-chan *LedgerTransaction +} + +// Client wraps UpdateService.GetUpdates with automatic reconnection and auth handling. +// It mirrors the streaming pattern established in pkg/cantonsdk/bridge/client.go. +type Client struct { + ledger ledger.Ledger + party string + logger *zap.Logger +} + +// New creates a new streaming Client for the given ledger and party. +func New(l ledger.Ledger, party string, opts ...Option) *Client { + s := applyOptions(opts) + return &Client{ + ledger: l, + party: party, + logger: s.logger, + } +} + +// Subscribe opens a live stream against the Canton ledger and returns a read-only channel +// of decoded transactions. It reconnects automatically with exponential backoff (5s → 60s) +// on transient errors, and invalidates the auth token on 401/403. +// +// lastOffset is updated atomically after each received transaction so that reconnects +// resume from the last safely received point. The caller is responsible for persisting +// lastOffset to the database (the processor does this atomically with event writes). +// +// The returned channel is closed when ctx is canceled or a terminal error occurs +// (io.EOF, context cancellation). +func (c *Client) Subscribe( + ctx context.Context, + req SubscribeRequest, + lastOffset *int64, +) <-chan *LedgerTransaction { + out := make(chan *LedgerTransaction, txChannelCap) + + go func() { + defer close(out) + + reconnectDelay := reconnectBaseDelay + + for { + select { + case <-ctx.Done(): + return + default: + } + + err := c.runStream(ctx, req.FromOffset, req.TemplateIDs, lastOffset, out) + if err == nil || errors.Is(err, io.EOF) || ctx.Err() != nil { + return + } + + if isAuthError(err) { + c.ledger.InvalidateToken() + reconnectDelay = reconnectBaseDelay + } + + // Advance FromOffset to where the stream last successfully delivered a + // transaction so the next connection resumes from the correct position. + req.FromOffset = atomic.LoadInt64(lastOffset) + + c.logger.Warn("canton stream disconnected, reconnecting", + zap.Error(err), + zap.Int64("resume_offset", req.FromOffset), + zap.Duration("backoff", reconnectDelay), + ) + + select { + case <-ctx.Done(): + return + case <-time.After(reconnectDelay): + } + + reconnectDelay = min(reconnectDelay*2, reconnectMaxDelay) + } + }() + + return out +} + +// runStream opens a single GetUpdates stream and forwards transactions to out until +// the stream ends or ctx is canceled. It updates lastOffset atomically on each +// received transaction. +func (c *Client) runStream( + ctx context.Context, + fromOffset int64, + templateIDs []TemplateID, + lastOffset *int64, + out chan<- *LedgerTransaction, +) error { + authCtx := c.ledger.AuthContext(ctx) + + stream, err := c.ledger.Update().GetUpdates(authCtx, &lapiv2.GetUpdatesRequest{ + BeginExclusive: fromOffset, + UpdateFormat: &lapiv2.UpdateFormat{ + IncludeTransactions: &lapiv2.TransactionFormat{ + EventFormat: &lapiv2.EventFormat{ + FiltersByParty: map[string]*lapiv2.Filters{ + c.party: buildTemplateFilters(templateIDs), + }, + Verbose: true, + }, + TransactionShape: lapiv2.TransactionShape_TRANSACTION_SHAPE_ACS_DELTA, + }, + }, + }) + if err != nil { + if isAuthError(err) { + c.ledger.InvalidateToken() + } + return fmt.Errorf("open canton stream: %w", err) + } + + for { + resp, err := stream.Recv() + if err != nil { + if isAuthError(err) { + c.ledger.InvalidateToken() + } + return err + } + + tx := resp.GetTransaction() + if tx == nil { + // Checkpoint or topology update — nothing to index. + continue + } + + lt := decodeLedgerTransaction(tx) + atomic.StoreInt64(lastOffset, lt.Offset) + + select { + case out <- lt: + case <-ctx.Done(): + return ctx.Err() + } + } +} + +// buildTemplateFilters constructs the Filters value for a set of TemplateIDs. +// +// This is the gRPC-level (template-level) filter. It controls which contract types +// are delivered by the Canton Ledger API — reducing bandwidth to only the requested +// templates. It is NOT an instrument filter: it cannot filter by contract field values +// such as instrumentId. Instrument filtering (by InstrumentKey{Admin, ID}) happens +// downstream in the parser after events are received. +// +// When TemplateID.PackageID is empty the filter matches the template across all +// deployed package versions. Setting PackageID="" for CIP56.Events.TokenTransferEvent +// enables indexing of any third-party CIP56-compliant token regardless of which +// package version it was deployed with. +func buildTemplateFilters(templateIDs []TemplateID) *lapiv2.Filters { + cumulative := make([]*lapiv2.CumulativeFilter, 0, len(templateIDs)) + for _, tid := range templateIDs { + cumulative = append(cumulative, &lapiv2.CumulativeFilter{ + IdentifierFilter: &lapiv2.CumulativeFilter_TemplateFilter{ + TemplateFilter: &lapiv2.TemplateFilter{ + TemplateId: &lapiv2.Identifier{ + PackageId: tid.PackageID, // empty = match all package versions + ModuleName: tid.ModuleName, + EntityName: tid.EntityName, + }, + }, + }, + }) + } + return &lapiv2.Filters{Cumulative: cumulative} +} + +// decodeLedgerTransaction converts a proto Transaction into a LedgerTransaction. +func decodeLedgerTransaction(tx *lapiv2.Transaction) *LedgerTransaction { + lt := &LedgerTransaction{ + UpdateID: tx.GetUpdateId(), + Offset: tx.GetOffset(), + EffectiveTime: tx.GetEffectiveAt().AsTime(), + Events: make([]*LedgerEvent, 0, len(tx.Events)), + } + for _, ev := range tx.Events { + if le := decodeLedgerEvent(ev); le != nil { + lt.Events = append(lt.Events, le) + } + } + return lt +} + +// decodeLedgerEvent converts a proto Event to a LedgerEvent. +// For created events the DAML CreateArguments are pre-decoded into LedgerEvent.fields +// so that callers never need to import lapiv2 directly. +// Returns nil for event kinds the indexer does not process (e.g. exercised events). +func decodeLedgerEvent(ev *lapiv2.Event) *LedgerEvent { + if created := ev.GetCreated(); created != nil { + le := &LedgerEvent{ + ContractID: created.GetContractId(), + IsCreated: true, + fields: values.RecordToMap(created.GetCreateArguments()), + } + if tid := created.GetTemplateId(); tid != nil { + le.PackageID = tid.GetPackageId() + le.ModuleName = tid.GetModuleName() + le.TemplateName = tid.GetEntityName() + } + return le + } + + if archived := ev.GetArchived(); archived != nil { + le := &LedgerEvent{ + ContractID: archived.GetContractId(), + IsCreated: false, + } + if tid := archived.GetTemplateId(); tid != nil { + le.PackageID = tid.GetPackageId() + le.ModuleName = tid.GetModuleName() + le.TemplateName = tid.GetEntityName() + } + return le + } + + return nil +} + +// isAuthError returns true if err signals authentication or authorisation failure. +func isAuthError(err error) bool { + if err == nil { + return false + } + st, ok := status.FromError(err) + if !ok { + return false + } + return st.Code() == codes.Unauthenticated || st.Code() == codes.PermissionDenied +} diff --git a/pkg/cantonsdk/streaming/options.go b/pkg/cantonsdk/streaming/options.go new file mode 100644 index 00000000..56e6eb84 --- /dev/null +++ b/pkg/cantonsdk/streaming/options.go @@ -0,0 +1,25 @@ +package streaming + +import "go.uber.org/zap" + +// Option configures a streaming Client. +type Option func(*settings) + +type settings struct { + logger *zap.Logger +} + +// WithLogger sets a custom logger on the streaming Client. +func WithLogger(l *zap.Logger) Option { + return func(s *settings) { s.logger = l } +} + +func applyOptions(opts []Option) settings { + s := settings{logger: zap.NewNop()} + for _, opt := range opts { + if opt != nil { + opt(&s) + } + } + return s +} diff --git a/pkg/cantonsdk/streaming/stream.go b/pkg/cantonsdk/streaming/stream.go new file mode 100644 index 00000000..dd150b8c --- /dev/null +++ b/pkg/cantonsdk/streaming/stream.go @@ -0,0 +1,62 @@ +package streaming + +import "context" + +// Batch carries decoded items from one LedgerTransaction, preserving the +// transaction boundary for atomic offset writes. +type Batch[T any] struct { + Offset int64 + UpdateID string + Items []T +} + +// Stream[T] wraps a Streamer and applies a per-event decode function. +// Use when subscribing to a single homogeneous template. +type Stream[T any] struct { + streamer Streamer + decode func(*LedgerTransaction, *LedgerEvent) (T, bool) +} + +// NewStream creates a Stream[T] that decodes events using the provided function. +func NewStream[T any](streamer Streamer, decode func(*LedgerTransaction, *LedgerEvent) (T, bool)) *Stream[T] { + return &Stream[T]{streamer: streamer, decode: decode} +} + +// Subscribe passes lastOffset to streamer.Subscribe, iterates each tx's events +// through decode, and emits *Batch[T] for every tx. Items may be empty — +// offset must still advance for no-op transactions. +func (s *Stream[T]) Subscribe(ctx context.Context, req SubscribeRequest, lastOffset *int64) <-chan *Batch[T] { + txCh := s.streamer.Subscribe(ctx, req, lastOffset) + out := make(chan *Batch[T], txChannelCap) + + go func() { + defer close(out) + for { + select { + case tx, ok := <-txCh: + if !ok { + return + } + batch := &Batch[T]{ + Offset: tx.Offset, + UpdateID: tx.UpdateID, + Items: make([]T, 0, len(tx.Events)), + } + for _, ev := range tx.Events { + if item, ok := s.decode(tx, ev); ok { + batch.Items = append(batch.Items, item) + } + } + select { + case out <- batch: + case <-ctx.Done(): + return + } + case <-ctx.Done(): + return + } + } + }() + + return out +} diff --git a/pkg/cantonsdk/streaming/types.go b/pkg/cantonsdk/streaming/types.go new file mode 100644 index 00000000..7b869441 --- /dev/null +++ b/pkg/cantonsdk/streaming/types.go @@ -0,0 +1,138 @@ +// Package streaming provides a reusable, generic Canton ledger streaming client. +// +// It wraps UpdateService.GetUpdates with automatic reconnection, exponential backoff, +// and auth-token invalidation on 401. +package streaming + +import ( + "time" + + lapiv2 "github.com/chainsafe/canton-middleware/pkg/cantonsdk/lapi/v2" + "github.com/chainsafe/canton-middleware/pkg/cantonsdk/values" +) + +// TemplateID identifies a DAML template by its package, module, and entity name. +// It is the streaming package's own type — callers do not import lapiv2 directly. +type TemplateID struct { + PackageID string + ModuleName string + EntityName string +} + +// SubscribeRequest configures which templates to stream and from which ledger offset. +type SubscribeRequest struct { + // FromOffset is the exclusive start offset. Use 0 to start from the beginning. + FromOffset int64 + + // TemplateIDs lists the DAML templates to subscribe to. + TemplateIDs []TemplateID +} + +// LedgerTransaction is a decoded transaction received from the Canton GetUpdates stream. +type LedgerTransaction struct { + UpdateID string + Offset int64 + EffectiveTime time.Time + Events []*LedgerEvent +} + +// LedgerEvent is a single created or archived contract event within a transaction. +// All DAML field access goes through typed accessor methods — no lapiv2 types are exposed. +type LedgerEvent struct { + ContractID string + PackageID string + ModuleName string + TemplateName string + + // IsCreated is true for contract create events, false for archive events. + IsCreated bool + + // fields holds the pre-decoded DAML record from CreateArguments, keyed by field label. + // Only populated for created events; nil for archived events. + fields map[string]*lapiv2.Value +} + +// NewLedgerEvent constructs a LedgerEvent with pre-decoded fields. +// Used by tests that need to build events without going through the proto decode path. +// Accepts FieldValue values produced by the Make* constructor functions so that +// callers have no direct dependency on lapiv2. +func NewLedgerEvent(contractID, packageID, moduleName, templateName string, isCreated bool, fields map[string]FieldValue) *LedgerEvent { + inner := make(map[string]*lapiv2.Value, len(fields)) + for k, v := range fields { + inner[k] = v.v + } + return &LedgerEvent{ + ContractID: contractID, + PackageID: packageID, + ModuleName: moduleName, + TemplateName: templateName, + IsCreated: isCreated, + fields: inner, + } +} + +// TextField returns the named DAML Text field as a Go string. +// Returns "" when the field is absent or not of type Text. +func (e *LedgerEvent) TextField(name string) string { + return values.Text(e.fields[name]) +} + +// PartyField returns the named DAML Party field as a string. +// Returns "" when the field is absent or not of type Party. +func (e *LedgerEvent) PartyField(name string) string { + return values.Party(e.fields[name]) +} + +// NumericField returns the named DAML Numeric field as a decimal string (e.g. "1.5"). +// Returns "0" when the field is absent or not of type Numeric. +func (e *LedgerEvent) NumericField(name string) string { + return values.Numeric(e.fields[name]) +} + +// OptionalTextField returns the inner Text value of a DAML Optional Text field. +// Returns "" for None or when the field is absent. +func (e *LedgerEvent) OptionalTextField(name string) string { + return values.OptionalText(e.fields[name]) +} + +// OptionalPartyField returns the inner Party value of a DAML Optional Party field. +// Returns "" for None or when the field is absent. +func (e *LedgerEvent) OptionalPartyField(name string) string { + return values.OptionalParty(e.fields[name]) +} + +// IsNone returns true if the named DAML Optional field holds None. +func (e *LedgerEvent) IsNone(name string) bool { + return values.IsNone(e.fields[name]) +} + +// TimestampField returns the named DAML Time field as a Go time.Time. +// Returns zero time when the field is absent or not of type Timestamp. +func (e *LedgerEvent) TimestampField(name string) time.Time { + return values.Timestamp(e.fields[name]) +} + +// NestedTextField accesses a Text sub-field inside a named DAML Record field. +// Example: event.NestedTextField("instrumentId", "id") +// Returns "" when the outer field is absent or not a Record. +func (e *LedgerEvent) NestedTextField(record, field string) string { + return values.NestedTextField(e.fields[record], field) +} + +// NestedPartyField accesses a Party sub-field inside a named DAML Record field. +// Example: event.NestedPartyField("instrumentId", "admin") +// Returns "" when the outer field is absent or not a Record. +func (e *LedgerEvent) NestedPartyField(record, field string) string { + return values.NestedPartyField(e.fields[record], field) +} + +// OptionalMetaLookup looks up a string key within an Optional Metadata field. +// Metadata is encoded as Optional(Record{values: Map Text Text}). +// Returns "" when the Optional is None, the key is absent, or the field is absent. +func (e *LedgerEvent) OptionalMetaLookup(metaField, key string) string { + inner := values.OptionalRecordFields(e.fields[metaField]) + if inner == nil { + return "" + } + return values.MapLookupText(inner["values"], key) +} diff --git a/pkg/cantonsdk/values/decode.go b/pkg/cantonsdk/values/decode.go index 2abb1fed..3844186e 100644 --- a/pkg/cantonsdk/values/decode.go +++ b/pkg/cantonsdk/values/decode.go @@ -17,6 +17,18 @@ func Text(v *lapiv2.Value) string { return "" } +// TextOK extracts a text value and reports whether the type matched. +func TextOK(v *lapiv2.Value) (string, bool) { + if v == nil { + return "", false + } + t, ok := v.Sum.(*lapiv2.Value_Text) + if !ok { + return "", false + } + return t.Text, true +} + // Party extracts a party value. func Party(v *lapiv2.Value) string { if v == nil { @@ -28,6 +40,18 @@ func Party(v *lapiv2.Value) string { return "" } +// PartyOK extracts a party value and reports whether the type matched. +func PartyOK(v *lapiv2.Value) (string, bool) { + if v == nil { + return "", false + } + p, ok := v.Sum.(*lapiv2.Value_Party) + if !ok { + return "", false + } + return p.Party, true +} + // PartyList extracts list of parties. func PartyList(v *lapiv2.Value) []string { if v == nil { @@ -56,6 +80,18 @@ func Numeric(v *lapiv2.Value) string { return "0" } +// NumericOK extracts a numeric value and reports whether the type matched. +func NumericOK(v *lapiv2.Value) (string, bool) { + if v == nil { + return "", false + } + n, ok := v.Sum.(*lapiv2.Value_Numeric) + if !ok { + return "", false + } + return n.Numeric, true +} + // ContractID extracts a contract ID value. func ContractID(v *lapiv2.Value) string { if v == nil { @@ -111,3 +147,94 @@ func Timestamp(v *lapiv2.Value) time.Time { } return time.Time{} } + +// TimestampOK extracts a timestamp and reports whether the type matched. +func TimestampOK(v *lapiv2.Value) (time.Time, bool) { + if v == nil { + return time.Time{}, false + } + t, ok := v.Sum.(*lapiv2.Value_Timestamp) + if !ok { + return time.Time{}, false + } + return time.UnixMicro(t.Timestamp), true +} + +// RecordField extracts a named field from a Record value, returning the sub-map. +// Returns nil when v is nil or not a Record. +func RecordField(v *lapiv2.Value) map[string]*lapiv2.Value { + if v == nil { + return nil + } + r, ok := v.Sum.(*lapiv2.Value_Record) + if !ok || r.Record == nil { + return nil + } + return RecordToMap(r.Record) +} + +// NestedTextField accesses a Text field within a nested DAML Record value. +// Use this for fields like instrumentId.id where instrumentId is a Record. +// Returns "" when v is nil, not a Record, or the field is absent. +func NestedTextField(v *lapiv2.Value, field string) string { + return Text(RecordField(v)[field]) +} + +// NestedTextFieldOK accesses a Text field within a nested DAML Record value and +// reports whether the lookup succeeded with the right type. +func NestedTextFieldOK(v *lapiv2.Value, field string) (string, bool) { + return TextOK(RecordField(v)[field]) +} + +// NestedPartyField accesses a Party field within a nested DAML Record value. +// Use this for fields like instrumentId.admin. +// Returns "" when v is nil, not a Record, or the field is absent. +func NestedPartyField(v *lapiv2.Value, field string) string { + return Party(RecordField(v)[field]) +} + +// NestedPartyFieldOK accesses a Party field within a nested DAML Record value and +// reports whether the lookup succeeded with the right type. +func NestedPartyFieldOK(v *lapiv2.Value, field string) (string, bool) { + return PartyOK(RecordField(v)[field]) +} + +// OptionalRecordFields extracts the inner Record fields from an Optional(Record) value. +// Returns nil when v is None or the inner value is not a Record. +func OptionalRecordFields(v *lapiv2.Value) map[string]*lapiv2.Value { + if IsNone(v) { + return nil + } + opt, ok := v.Sum.(*lapiv2.Value_Optional) + if !ok || opt.Optional == nil || opt.Optional.Value == nil { + return nil + } + return RecordField(opt.Optional.Value) +} + +// MapLookupText looks up a string key in a DAML Map Text Text value. +// Handles both TextMap (DA.TextMap) and GenMap (DA.Map) encodings. +// Returns "" when v is nil, not a map, or the key is absent. +func MapLookupText(v *lapiv2.Value, key string) string { + if v == nil { + return "" + } + // DA.TextMap.TextMap serializes as Value_TextMap + if tm, ok := v.Sum.(*lapiv2.Value_TextMap); ok && tm.TextMap != nil { + for _, e := range tm.TextMap.Entries { + if e.GetKey() == key { + return Text(e.GetValue()) + } + } + return "" + } + // DA.Map.Map serializes as Value_GenMap with Text keys + if gm, ok := v.Sum.(*lapiv2.Value_GenMap); ok && gm.GenMap != nil { + for _, e := range gm.GenMap.Entries { + if Text(e.GetKey()) == key { + return Text(e.GetValue()) + } + } + } + return "" +} diff --git a/pkg/indexer/engine/decoder.go b/pkg/indexer/engine/decoder.go new file mode 100644 index 00000000..aef78533 --- /dev/null +++ b/pkg/indexer/engine/decoder.go @@ -0,0 +1,127 @@ +package engine + +import ( + "github.com/chainsafe/canton-middleware/pkg/cantonsdk/streaming" + "github.com/chainsafe/canton-middleware/pkg/indexer" + + "go.uber.org/zap" +) + +const ( + tokenTransferEventModule = "CIP56.Events" + tokenTransferEventEntity = "TokenTransferEvent" + + // Metadata keys for bridge context stored in TokenTransferEvent.meta.values. + metaKeyExternalTxID = "bridge.externalTxId" + metaKeyExternalAddress = "bridge.externalAddress" + metaKeyFingerprint = "bridge.fingerprint" +) + +// NewTokenTransferDecoder returns a decode function for use with streaming.NewStream. +// +// The closure: +// - skips archived events +// - checks ModuleName == "CIP56.Events" && TemplateName == "TokenTransferEvent" +// - applies the FilterModeWhitelist instrument check when mode is FilterModeWhitelist +// - extracts all fields into a *ParsedEvent +// - returns nil, false for invalid events (both parties absent, filter miss) +func NewTokenTransferDecoder( + mode indexer.FilterMode, + allowed []indexer.InstrumentKey, + logger *zap.Logger, +) func(*streaming.LedgerTransaction, *streaming.LedgerEvent) (*indexer.ParsedEvent, bool) { + allowedMap := make(map[indexer.InstrumentKey]struct{}, len(allowed)) + for _, k := range allowed { + allowedMap[k] = struct{}{} + } + + return func(tx *streaming.LedgerTransaction, ev *streaming.LedgerEvent) (*indexer.ParsedEvent, bool) { + if !ev.IsCreated { + return nil, false + } + if ev.ModuleName != tokenTransferEventModule || ev.TemplateName != tokenTransferEventEntity { + return nil, false + } + + instrumentID := ev.NestedTextField("instrumentId", "id") + instrumentAdmin := ev.NestedPartyField("instrumentId", "admin") + key := indexer.InstrumentKey{Admin: instrumentAdmin, ID: instrumentID} + + if mode == indexer.FilterModeWhitelist { + if _, ok := allowedMap[key]; !ok { + logger.Debug("skipping event for unlisted instrument", + zap.String("instrument_id", instrumentID), + zap.String("instrument_admin", instrumentAdmin), + zap.String("contract_id", ev.ContractID), + ) + return nil, false + } + } + + var fromPartyID *string + if !ev.IsNone("fromParty") { + v := ev.OptionalPartyField("fromParty") + if v != "" { + fromPartyID = &v + } + } + + var toPartyID *string + if !ev.IsNone("toParty") { + v := ev.OptionalPartyField("toParty") + if v != "" { + toPartyID = &v + } + } + + var externalTxID *string + if v := ev.OptionalMetaLookup("meta", metaKeyExternalTxID); v != "" { + externalTxID = &v + } + + var externalAddress *string + if v := ev.OptionalMetaLookup("meta", metaKeyExternalAddress); v != "" { + externalAddress = &v + } + + var fingerprint *string + if v := ev.OptionalMetaLookup("meta", metaKeyFingerprint); v != "" { + fingerprint = &v + } + + var et indexer.EventType + switch { + case fromPartyID == nil && toPartyID != nil: + et = indexer.EventMint + case fromPartyID != nil && toPartyID == nil: + et = indexer.EventBurn + case fromPartyID != nil && toPartyID != nil: + et = indexer.EventTransfer + default: + logger.Warn("dropping TokenTransferEvent with both parties absent", + zap.String("contract_id", ev.ContractID), + zap.String("tx_id", tx.UpdateID), + zap.String("instrument_id", instrumentID), + ) + return nil, false + } + + return &indexer.ParsedEvent{ + InstrumentID: instrumentID, + InstrumentAdmin: instrumentAdmin, + Issuer: ev.PartyField("issuer"), + EventType: et, + Amount: ev.NumericField("amount"), + FromPartyID: fromPartyID, + ToPartyID: toPartyID, + ExternalTxID: externalTxID, + ExternalAddress: externalAddress, + Fingerprint: fingerprint, + ContractID: ev.ContractID, + TxID: tx.UpdateID, + LedgerOffset: tx.Offset, + Timestamp: ev.TimestampField("timestamp"), + EffectiveTime: tx.EffectiveTime, + }, true + } +} diff --git a/pkg/indexer/engine/decoder_test.go b/pkg/indexer/engine/decoder_test.go new file mode 100644 index 00000000..65e5cce2 --- /dev/null +++ b/pkg/indexer/engine/decoder_test.go @@ -0,0 +1,248 @@ +package engine + +import ( + "testing" + "time" + + "github.com/chainsafe/canton-middleware/pkg/cantonsdk/streaming" + "github.com/chainsafe/canton-middleware/pkg/indexer" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap" +) + +// --------------------------------------------------------------------------- +// Shared test constants (accessible from processor_test.go, same package) +// --------------------------------------------------------------------------- + +const ( + testContractID = "contract-id-1" + testInstrumentID = "DEMO" + testInstrumentAdmin = "issuer-party::abc123" + testIssuer = "issuer-party::abc123" + testAmount = "100.000000000000000000" + testRecipient = "recipient-party::def456" + testSender = "sender-party::ghi789" +) + +// --------------------------------------------------------------------------- +// Test event / transaction builders +// --------------------------------------------------------------------------- + +func makeTransferEvent(contractID string, fromParty, toParty streaming.FieldValue, extra map[string]streaming.FieldValue) *streaming.LedgerEvent { + fields := map[string]streaming.FieldValue{ + "instrumentId": streaming.MakeRecordField(map[string]streaming.FieldValue{ + "id": streaming.MakeTextField(testInstrumentID), + "admin": streaming.MakePartyField(testInstrumentAdmin), + }), + "issuer": streaming.MakePartyField(testIssuer), + "fromParty": fromParty, + "toParty": toParty, + "amount": streaming.MakeNumericField(testAmount), + "timestamp": streaming.MakeTimestampField(time.Unix(1_700_000_000, 0)), + "meta": streaming.MakeNoneField(), + } + for k, v := range extra { + fields[k] = v + } + return streaming.NewLedgerEvent(contractID, "pkg-id", tokenTransferEventModule, tokenTransferEventEntity, true, fields) +} + +func makeTx(offset int64, events ...*streaming.LedgerEvent) *streaming.LedgerTransaction { + return &streaming.LedgerTransaction{ + UpdateID: "update-" + string(rune('0'+offset)), + Offset: offset, + EffectiveTime: time.Unix(1_700_000_000, 0), + Events: events, + } +} + +// decodeAll applies decode to every event in tx and collects successful results. +func decodeAll(decode func(*streaming.LedgerTransaction, *streaming.LedgerEvent) (*indexer.ParsedEvent, bool), tx *streaming.LedgerTransaction) []*indexer.ParsedEvent { + var out []*indexer.ParsedEvent + for _, ev := range tx.Events { + if pe, ok := decode(tx, ev); ok { + out = append(out, pe) + } + } + return out +} + +// --------------------------------------------------------------------------- +// Decoder tests +// --------------------------------------------------------------------------- + +func TestDecoder_FilterModeAll_Mint(t *testing.T) { + decode := NewTokenTransferDecoder(indexer.FilterModeAll, nil, zap.NewNop()) + + ev := makeTransferEvent(testContractID, streaming.MakeNoneField(), streaming.MakeSomePartyField(testRecipient), nil) + got := decodeAll(decode, makeTx(1, ev)) + + require.Len(t, got, 1) + pe := got[0] + assert.Equal(t, indexer.EventMint, pe.EventType) + assert.Nil(t, pe.FromPartyID) + assert.Equal(t, testRecipient, *pe.ToPartyID) + assert.Equal(t, testInstrumentID, pe.InstrumentID) + assert.Equal(t, testInstrumentAdmin, pe.InstrumentAdmin) + assert.Equal(t, testIssuer, pe.Issuer) + assert.Equal(t, testAmount, pe.Amount) + assert.Equal(t, testContractID, pe.ContractID) + assert.Equal(t, int64(1), pe.LedgerOffset) + assert.Equal(t, "update-1", pe.TxID) + assert.Equal(t, time.Unix(1_700_000_000, 0), pe.EffectiveTime) +} + +func TestDecoder_FilterModeAll_Burn(t *testing.T) { + decode := NewTokenTransferDecoder(indexer.FilterModeAll, nil, zap.NewNop()) + + ev := makeTransferEvent(testContractID, streaming.MakeSomePartyField(testSender), streaming.MakeNoneField(), nil) + got := decodeAll(decode, makeTx(2, ev)) + + require.Len(t, got, 1) + pe := got[0] + assert.Equal(t, indexer.EventBurn, pe.EventType) + assert.Equal(t, testSender, *pe.FromPartyID) + assert.Nil(t, pe.ToPartyID) +} + +func TestDecoder_FilterModeAll_Transfer(t *testing.T) { + decode := NewTokenTransferDecoder(indexer.FilterModeAll, nil, zap.NewNop()) + + ev := makeTransferEvent(testContractID, streaming.MakeSomePartyField(testSender), streaming.MakeSomePartyField(testRecipient), nil) + got := decodeAll(decode, makeTx(3, ev)) + + require.Len(t, got, 1) + pe := got[0] + assert.Equal(t, indexer.EventTransfer, pe.EventType) + assert.Equal(t, testSender, *pe.FromPartyID) + assert.Equal(t, testRecipient, *pe.ToPartyID) +} + +func TestDecoder_BothPartiesAbsent_Dropped(t *testing.T) { + decode := NewTokenTransferDecoder(indexer.FilterModeAll, nil, zap.NewNop()) + + ev := makeTransferEvent(testContractID, streaming.MakeNoneField(), streaming.MakeNoneField(), nil) + got := decodeAll(decode, makeTx(4, ev)) + + assert.Empty(t, got) +} + +func TestDecoder_SkipsArchivedEvent(t *testing.T) { + decode := NewTokenTransferDecoder(indexer.FilterModeAll, nil, zap.NewNop()) + + ev := streaming.NewLedgerEvent(testContractID, "pkg-id", tokenTransferEventModule, tokenTransferEventEntity, false, nil) + got := decodeAll(decode, makeTx(5, ev)) + + assert.Empty(t, got) +} + +func TestDecoder_SkipsWrongTemplate(t *testing.T) { + decode := NewTokenTransferDecoder(indexer.FilterModeAll, nil, zap.NewNop()) + + ev := streaming.NewLedgerEvent(testContractID, "pkg-id", "OtherModule", "OtherEntity", true, map[string]streaming.FieldValue{}) + got := decodeAll(decode, makeTx(6, ev)) + + assert.Empty(t, got) +} + +func TestDecoder_FilterModeWhitelist_Allowed(t *testing.T) { + allowed := []indexer.InstrumentKey{{Admin: testInstrumentAdmin, ID: testInstrumentID}} + decode := NewTokenTransferDecoder(indexer.FilterModeWhitelist, allowed, zap.NewNop()) + + ev := makeTransferEvent(testContractID, streaming.MakeNoneField(), streaming.MakeSomePartyField(testRecipient), nil) + got := decodeAll(decode, makeTx(7, ev)) + + require.Len(t, got, 1) + assert.Equal(t, testInstrumentID, got[0].InstrumentID) +} + +func TestDecoder_FilterModeWhitelist_Blocked_WrongAdmin(t *testing.T) { + allowed := []indexer.InstrumentKey{{Admin: "other-issuer::xyz", ID: testInstrumentID}} + decode := NewTokenTransferDecoder(indexer.FilterModeWhitelist, allowed, zap.NewNop()) + + ev := makeTransferEvent(testContractID, streaming.MakeNoneField(), streaming.MakeSomePartyField(testRecipient), nil) + got := decodeAll(decode, makeTx(8, ev)) + + assert.Empty(t, got) +} + +func TestDecoder_FilterModeWhitelist_Blocked_WrongID(t *testing.T) { + allowed := []indexer.InstrumentKey{{Admin: testInstrumentAdmin, ID: "OTHER"}} + decode := NewTokenTransferDecoder(indexer.FilterModeWhitelist, allowed, zap.NewNop()) + + ev := makeTransferEvent(testContractID, streaming.MakeNoneField(), streaming.MakeSomePartyField(testRecipient), nil) + got := decodeAll(decode, makeTx(9, ev)) + + assert.Empty(t, got) +} + +func TestDecoder_FilterModeWhitelist_MultipleKeys_MatchingPasses(t *testing.T) { + allowed := []indexer.InstrumentKey{ + {Admin: "other-issuer::xyz", ID: "OTHER"}, + {Admin: testInstrumentAdmin, ID: testInstrumentID}, + } + decode := NewTokenTransferDecoder(indexer.FilterModeWhitelist, allowed, zap.NewNop()) + + ev := makeTransferEvent(testContractID, streaming.MakeNoneField(), streaming.MakeSomePartyField(testRecipient), nil) + got := decodeAll(decode, makeTx(10, ev)) + + require.Len(t, got, 1) +} + +func TestDecoder_BridgeMetaExtracted(t *testing.T) { + decode := NewTokenTransferDecoder(indexer.FilterModeAll, nil, zap.NewNop()) + + meta := streaming.MakeSomeRecordField(map[string]streaming.FieldValue{ + "values": streaming.MakeTextMapField(map[string]string{ + metaKeyExternalTxID: "0xdeadbeef", + metaKeyExternalAddress: "0xabc", + metaKeyFingerprint: "fp-1", + }), + }) + ev := makeTransferEvent(testContractID, + streaming.MakeSomePartyField(testSender), streaming.MakeSomePartyField(testRecipient), + map[string]streaming.FieldValue{"meta": meta}, + ) + got := decodeAll(decode, makeTx(11, ev)) + + require.Len(t, got, 1) + pe := got[0] + require.NotNil(t, pe.ExternalTxID) + assert.Equal(t, "0xdeadbeef", *pe.ExternalTxID) + require.NotNil(t, pe.ExternalAddress) + assert.Equal(t, "0xabc", *pe.ExternalAddress) + require.NotNil(t, pe.Fingerprint) + assert.Equal(t, "fp-1", *pe.Fingerprint) +} + +func TestDecoder_BridgeMeta_NoneField_NilPointers(t *testing.T) { + decode := NewTokenTransferDecoder(indexer.FilterModeAll, nil, zap.NewNop()) + + // meta = None → all bridge fields should be nil + ev := makeTransferEvent(testContractID, + streaming.MakeSomePartyField(testSender), streaming.MakeSomePartyField(testRecipient), nil, + ) + got := decodeAll(decode, makeTx(12, ev)) + + require.Len(t, got, 1) + pe := got[0] + assert.Nil(t, pe.ExternalTxID) + assert.Nil(t, pe.ExternalAddress) + assert.Nil(t, pe.Fingerprint) +} + +func TestDecoder_MultipleEventsInTx_OnlyMatchingReturned(t *testing.T) { + decode := NewTokenTransferDecoder(indexer.FilterModeAll, nil, zap.NewNop()) + + ev1 := makeTransferEvent("c-1", streaming.MakeNoneField(), streaming.MakeSomePartyField(testRecipient), nil) + ev2 := makeTransferEvent("c-2", streaming.MakeSomePartyField(testSender), streaming.MakeNoneField(), nil) + ev3 := streaming.NewLedgerEvent("c-3", "pkg", "Other", "Template", true, map[string]streaming.FieldValue{}) + + got := decodeAll(decode, makeTx(13, ev1, ev2, ev3)) + + require.Len(t, got, 2) + assert.Equal(t, "c-1", got[0].ContractID) + assert.Equal(t, "c-2", got[1].ContractID) +} diff --git a/pkg/indexer/engine/export_test.go b/pkg/indexer/engine/export_test.go new file mode 100644 index 00000000..9e8e6a7f --- /dev/null +++ b/pkg/indexer/engine/export_test.go @@ -0,0 +1,10 @@ +package engine + +import "time" + +// SetRetryBaseDelay overrides processorRetryBaseDelay for the duration of a test. +func SetRetryBaseDelay(t interface{ Cleanup(func()) }, d time.Duration) { + orig := processorRetryBaseDelay + processorRetryBaseDelay = d + t.Cleanup(func() { processorRetryBaseDelay = orig }) +} diff --git a/pkg/indexer/engine/fetcher.go b/pkg/indexer/engine/fetcher.go new file mode 100644 index 00000000..62d816b0 --- /dev/null +++ b/pkg/indexer/engine/fetcher.go @@ -0,0 +1,75 @@ +package engine + +import ( + "context" + "sync" + "sync/atomic" + + "github.com/chainsafe/canton-middleware/pkg/cantonsdk/streaming" + "github.com/chainsafe/canton-middleware/pkg/indexer" + + "go.uber.org/zap" +) + +// Fetcher opens a live Canton stream from a caller-supplied resume offset and +// exposes the resulting batches via Events. +// +// Typical usage: +// +// decode := engine.NewTokenTransferDecoder(mode, allowed, logger) +// f := engine.NewFetcher(streamClient, templateID, decode, logger) +// f.Start(ctx, lastProcessedOffset) +// for batch := range f.Events() { ... } +type Fetcher struct { + stream *streaming.Stream[*indexer.ParsedEvent] + templateID streaming.TemplateID + out <-chan *streaming.Batch[*indexer.ParsedEvent] + once sync.Once + logger *zap.Logger +} + +// NewFetcher creates a new Fetcher. +// +// - streamer: Canton streaming client (handles reconnection, auth, backoff) +// - templateID: DAML template to subscribe to (e.g. TokenTransferEvent) +// - decode: per-event decode function (see NewTokenTransferDecoder) +// - logger: caller-provided logger +func NewFetcher( + streamer streaming.Streamer, + templateID streaming.TemplateID, + decode func(*streaming.LedgerTransaction, *streaming.LedgerEvent) (*indexer.ParsedEvent, bool), + logger *zap.Logger, +) *Fetcher { + return &Fetcher{ + stream: streaming.NewStream(streamer, decode), + templateID: templateID, + logger: logger, + } +} + +// Start begins streaming from offset. It is non-blocking; the underlying goroutine +// exits when ctx is canceled or the stream closes. +// +// Start must be called exactly once before Events is used. Subsequent calls are no-ops. +func (f *Fetcher) Start(ctx context.Context, offset int64) { + f.once.Do(func() { + f.logger.Info("fetcher starting", zap.Int64("resume_offset", offset)) + + // lastOffset is updated atomically by the streaming.Client goroutine as + // transactions arrive, and read back by its reconnect loop on each new + // connection attempt, ensuring exactly-once resumption from the right point. + var lastOffset int64 + atomic.StoreInt64(&lastOffset, offset) + + f.out = f.stream.Subscribe(ctx, streaming.SubscribeRequest{ + FromOffset: offset, + TemplateIDs: []streaming.TemplateID{f.templateID}, + }, &lastOffset) + }) +} + +// Events returns the read-only channel of decoded batches. +// Must be called after Start. The channel is closed when the stream terminates. +func (f *Fetcher) Events() <-chan *streaming.Batch[*indexer.ParsedEvent] { + return f.out +} diff --git a/pkg/indexer/engine/mocks/mock_event_fetcher.go b/pkg/indexer/engine/mocks/mock_event_fetcher.go new file mode 100644 index 00000000..76757f06 --- /dev/null +++ b/pkg/indexer/engine/mocks/mock_event_fetcher.go @@ -0,0 +1,121 @@ +// Code generated by mockery v2.53.6. DO NOT EDIT. + +package mocks + +import ( + context "context" + + indexer "github.com/chainsafe/canton-middleware/pkg/indexer" + + mock "github.com/stretchr/testify/mock" + + streaming "github.com/chainsafe/canton-middleware/pkg/cantonsdk/streaming" +) + +// EventFetcher is an autogenerated mock type for the EventFetcher type +type EventFetcher struct { + mock.Mock +} + +type EventFetcher_Expecter struct { + mock *mock.Mock +} + +func (_m *EventFetcher) EXPECT() *EventFetcher_Expecter { + return &EventFetcher_Expecter{mock: &_m.Mock} +} + +// Events provides a mock function with no fields +func (_m *EventFetcher) Events() <-chan *streaming.Batch[*indexer.ParsedEvent] { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for Events") + } + + var r0 <-chan *streaming.Batch[*indexer.ParsedEvent] + if rf, ok := ret.Get(0).(func() <-chan *streaming.Batch[*indexer.ParsedEvent]); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(<-chan *streaming.Batch[*indexer.ParsedEvent]) + } + } + + return r0 +} + +// EventFetcher_Events_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Events' +type EventFetcher_Events_Call struct { + *mock.Call +} + +// Events is a helper method to define mock.On call +func (_e *EventFetcher_Expecter) Events() *EventFetcher_Events_Call { + return &EventFetcher_Events_Call{Call: _e.mock.On("Events")} +} + +func (_c *EventFetcher_Events_Call) Run(run func()) *EventFetcher_Events_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *EventFetcher_Events_Call) Return(_a0 <-chan *streaming.Batch[*indexer.ParsedEvent]) *EventFetcher_Events_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *EventFetcher_Events_Call) RunAndReturn(run func() <-chan *streaming.Batch[*indexer.ParsedEvent]) *EventFetcher_Events_Call { + _c.Call.Return(run) + return _c +} + +// Start provides a mock function with given fields: ctx, offset +func (_m *EventFetcher) Start(ctx context.Context, offset int64) { + _m.Called(ctx, offset) +} + +// EventFetcher_Start_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Start' +type EventFetcher_Start_Call struct { + *mock.Call +} + +// Start is a helper method to define mock.On call +// - ctx context.Context +// - offset int64 +func (_e *EventFetcher_Expecter) Start(ctx interface{}, offset interface{}) *EventFetcher_Start_Call { + return &EventFetcher_Start_Call{Call: _e.mock.On("Start", ctx, offset)} +} + +func (_c *EventFetcher_Start_Call) Run(run func(ctx context.Context, offset int64)) *EventFetcher_Start_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(int64)) + }) + return _c +} + +func (_c *EventFetcher_Start_Call) Return() *EventFetcher_Start_Call { + _c.Call.Return() + return _c +} + +func (_c *EventFetcher_Start_Call) RunAndReturn(run func(context.Context, int64)) *EventFetcher_Start_Call { + _c.Run(run) + return _c +} + +// NewEventFetcher creates a new instance of EventFetcher. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewEventFetcher(t interface { + mock.TestingT + Cleanup(func()) +}) *EventFetcher { + mock := &EventFetcher{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/pkg/indexer/engine/mocks/mock_store.go b/pkg/indexer/engine/mocks/mock_store.go new file mode 100644 index 00000000..0fc43880 --- /dev/null +++ b/pkg/indexer/engine/mocks/mock_store.go @@ -0,0 +1,392 @@ +// Code generated by mockery v2.53.6. DO NOT EDIT. + +package mocks + +import ( + context "context" + + indexer "github.com/chainsafe/canton-middleware/pkg/indexer" + engine "github.com/chainsafe/canton-middleware/pkg/indexer/engine" + + mock "github.com/stretchr/testify/mock" +) + +// Store is an autogenerated mock type for the Store type +type Store struct { + mock.Mock +} + +type Store_Expecter struct { + mock *mock.Mock +} + +func (_m *Store) EXPECT() *Store_Expecter { + return &Store_Expecter{mock: &_m.Mock} +} + +// ApplyBalanceDelta provides a mock function with given fields: ctx, partyID, instrumentAdmin, instrumentID, delta +func (_m *Store) ApplyBalanceDelta(ctx context.Context, partyID string, instrumentAdmin string, instrumentID string, delta string) error { + ret := _m.Called(ctx, partyID, instrumentAdmin, instrumentID, delta) + + if len(ret) == 0 { + panic("no return value specified for ApplyBalanceDelta") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, string, string, string, string) error); ok { + r0 = rf(ctx, partyID, instrumentAdmin, instrumentID, delta) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Store_ApplyBalanceDelta_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ApplyBalanceDelta' +type Store_ApplyBalanceDelta_Call struct { + *mock.Call +} + +// ApplyBalanceDelta is a helper method to define mock.On call +// - ctx context.Context +// - partyID string +// - instrumentAdmin string +// - instrumentID string +// - delta string +func (_e *Store_Expecter) ApplyBalanceDelta(ctx interface{}, partyID interface{}, instrumentAdmin interface{}, instrumentID interface{}, delta interface{}) *Store_ApplyBalanceDelta_Call { + return &Store_ApplyBalanceDelta_Call{Call: _e.mock.On("ApplyBalanceDelta", ctx, partyID, instrumentAdmin, instrumentID, delta)} +} + +func (_c *Store_ApplyBalanceDelta_Call) Run(run func(ctx context.Context, partyID string, instrumentAdmin string, instrumentID string, delta string)) *Store_ApplyBalanceDelta_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(string), args[2].(string), args[3].(string), args[4].(string)) + }) + return _c +} + +func (_c *Store_ApplyBalanceDelta_Call) Return(_a0 error) *Store_ApplyBalanceDelta_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *Store_ApplyBalanceDelta_Call) RunAndReturn(run func(context.Context, string, string, string, string) error) *Store_ApplyBalanceDelta_Call { + _c.Call.Return(run) + return _c +} + +// ApplySupplyDelta provides a mock function with given fields: ctx, instrumentAdmin, instrumentID, delta +func (_m *Store) ApplySupplyDelta(ctx context.Context, instrumentAdmin string, instrumentID string, delta string) error { + ret := _m.Called(ctx, instrumentAdmin, instrumentID, delta) + + if len(ret) == 0 { + panic("no return value specified for ApplySupplyDelta") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, string, string, string) error); ok { + r0 = rf(ctx, instrumentAdmin, instrumentID, delta) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Store_ApplySupplyDelta_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ApplySupplyDelta' +type Store_ApplySupplyDelta_Call struct { + *mock.Call +} + +// ApplySupplyDelta is a helper method to define mock.On call +// - ctx context.Context +// - instrumentAdmin string +// - instrumentID string +// - delta string +func (_e *Store_Expecter) ApplySupplyDelta(ctx interface{}, instrumentAdmin interface{}, instrumentID interface{}, delta interface{}) *Store_ApplySupplyDelta_Call { + return &Store_ApplySupplyDelta_Call{Call: _e.mock.On("ApplySupplyDelta", ctx, instrumentAdmin, instrumentID, delta)} +} + +func (_c *Store_ApplySupplyDelta_Call) Run(run func(ctx context.Context, instrumentAdmin string, instrumentID string, delta string)) *Store_ApplySupplyDelta_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(string), args[2].(string), args[3].(string)) + }) + return _c +} + +func (_c *Store_ApplySupplyDelta_Call) Return(_a0 error) *Store_ApplySupplyDelta_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *Store_ApplySupplyDelta_Call) RunAndReturn(run func(context.Context, string, string, string) error) *Store_ApplySupplyDelta_Call { + _c.Call.Return(run) + return _c +} + +// InsertEvent provides a mock function with given fields: ctx, event +func (_m *Store) InsertEvent(ctx context.Context, event *indexer.ParsedEvent) (bool, error) { + ret := _m.Called(ctx, event) + + if len(ret) == 0 { + panic("no return value specified for InsertEvent") + } + + var r0 bool + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *indexer.ParsedEvent) (bool, error)); ok { + return rf(ctx, event) + } + if rf, ok := ret.Get(0).(func(context.Context, *indexer.ParsedEvent) bool); ok { + r0 = rf(ctx, event) + } else { + r0 = ret.Get(0).(bool) + } + + if rf, ok := ret.Get(1).(func(context.Context, *indexer.ParsedEvent) error); ok { + r1 = rf(ctx, event) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// Store_InsertEvent_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'InsertEvent' +type Store_InsertEvent_Call struct { + *mock.Call +} + +// InsertEvent is a helper method to define mock.On call +// - ctx context.Context +// - event *indexer.ParsedEvent +func (_e *Store_Expecter) InsertEvent(ctx interface{}, event interface{}) *Store_InsertEvent_Call { + return &Store_InsertEvent_Call{Call: _e.mock.On("InsertEvent", ctx, event)} +} + +func (_c *Store_InsertEvent_Call) Run(run func(ctx context.Context, event *indexer.ParsedEvent)) *Store_InsertEvent_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(*indexer.ParsedEvent)) + }) + return _c +} + +func (_c *Store_InsertEvent_Call) Return(inserted bool, err error) *Store_InsertEvent_Call { + _c.Call.Return(inserted, err) + return _c +} + +func (_c *Store_InsertEvent_Call) RunAndReturn(run func(context.Context, *indexer.ParsedEvent) (bool, error)) *Store_InsertEvent_Call { + _c.Call.Return(run) + return _c +} + +// LatestOffset provides a mock function with given fields: ctx +func (_m *Store) LatestOffset(ctx context.Context) (int64, error) { + ret := _m.Called(ctx) + + if len(ret) == 0 { + panic("no return value specified for LatestOffset") + } + + var r0 int64 + var r1 error + if rf, ok := ret.Get(0).(func(context.Context) (int64, error)); ok { + return rf(ctx) + } + if rf, ok := ret.Get(0).(func(context.Context) int64); ok { + r0 = rf(ctx) + } else { + r0 = ret.Get(0).(int64) + } + + if rf, ok := ret.Get(1).(func(context.Context) error); ok { + r1 = rf(ctx) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// Store_LatestOffset_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'LatestOffset' +type Store_LatestOffset_Call struct { + *mock.Call +} + +// LatestOffset is a helper method to define mock.On call +// - ctx context.Context +func (_e *Store_Expecter) LatestOffset(ctx interface{}) *Store_LatestOffset_Call { + return &Store_LatestOffset_Call{Call: _e.mock.On("LatestOffset", ctx)} +} + +func (_c *Store_LatestOffset_Call) Run(run func(ctx context.Context)) *Store_LatestOffset_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context)) + }) + return _c +} + +func (_c *Store_LatestOffset_Call) Return(_a0 int64, _a1 error) *Store_LatestOffset_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *Store_LatestOffset_Call) RunAndReturn(run func(context.Context) (int64, error)) *Store_LatestOffset_Call { + _c.Call.Return(run) + return _c +} + +// RunInTx provides a mock function with given fields: ctx, fn +func (_m *Store) RunInTx(ctx context.Context, fn func(context.Context, engine.Store) error) error { + ret := _m.Called(ctx, fn) + + if len(ret) == 0 { + panic("no return value specified for RunInTx") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, func(context.Context, engine.Store) error) error); ok { + r0 = rf(ctx, fn) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Store_RunInTx_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RunInTx' +type Store_RunInTx_Call struct { + *mock.Call +} + +// RunInTx is a helper method to define mock.On call +// - ctx context.Context +// - fn func(context.Context , engine.Store) error +func (_e *Store_Expecter) RunInTx(ctx interface{}, fn interface{}) *Store_RunInTx_Call { + return &Store_RunInTx_Call{Call: _e.mock.On("RunInTx", ctx, fn)} +} + +func (_c *Store_RunInTx_Call) Run(run func(ctx context.Context, fn func(context.Context, engine.Store) error)) *Store_RunInTx_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(func(context.Context, engine.Store) error)) + }) + return _c +} + +func (_c *Store_RunInTx_Call) Return(_a0 error) *Store_RunInTx_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *Store_RunInTx_Call) RunAndReturn(run func(context.Context, func(context.Context, engine.Store) error) error) *Store_RunInTx_Call { + _c.Call.Return(run) + return _c +} + +// SaveOffset provides a mock function with given fields: ctx, offset +func (_m *Store) SaveOffset(ctx context.Context, offset int64) error { + ret := _m.Called(ctx, offset) + + if len(ret) == 0 { + panic("no return value specified for SaveOffset") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, int64) error); ok { + r0 = rf(ctx, offset) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Store_SaveOffset_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SaveOffset' +type Store_SaveOffset_Call struct { + *mock.Call +} + +// SaveOffset is a helper method to define mock.On call +// - ctx context.Context +// - offset int64 +func (_e *Store_Expecter) SaveOffset(ctx interface{}, offset interface{}) *Store_SaveOffset_Call { + return &Store_SaveOffset_Call{Call: _e.mock.On("SaveOffset", ctx, offset)} +} + +func (_c *Store_SaveOffset_Call) Run(run func(ctx context.Context, offset int64)) *Store_SaveOffset_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(int64)) + }) + return _c +} + +func (_c *Store_SaveOffset_Call) Return(_a0 error) *Store_SaveOffset_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *Store_SaveOffset_Call) RunAndReturn(run func(context.Context, int64) error) *Store_SaveOffset_Call { + _c.Call.Return(run) + return _c +} + +// UpsertToken provides a mock function with given fields: ctx, token +func (_m *Store) UpsertToken(ctx context.Context, token *indexer.Token) error { + ret := _m.Called(ctx, token) + + if len(ret) == 0 { + panic("no return value specified for UpsertToken") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, *indexer.Token) error); ok { + r0 = rf(ctx, token) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Store_UpsertToken_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UpsertToken' +type Store_UpsertToken_Call struct { + *mock.Call +} + +// UpsertToken is a helper method to define mock.On call +// - ctx context.Context +// - token *indexer.Token +func (_e *Store_Expecter) UpsertToken(ctx interface{}, token interface{}) *Store_UpsertToken_Call { + return &Store_UpsertToken_Call{Call: _e.mock.On("UpsertToken", ctx, token)} +} + +func (_c *Store_UpsertToken_Call) Run(run func(ctx context.Context, token *indexer.Token)) *Store_UpsertToken_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(*indexer.Token)) + }) + return _c +} + +func (_c *Store_UpsertToken_Call) Return(_a0 error) *Store_UpsertToken_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *Store_UpsertToken_Call) RunAndReturn(run func(context.Context, *indexer.Token) error) *Store_UpsertToken_Call { + _c.Call.Return(run) + return _c +} + +// NewStore creates a new instance of Store. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewStore(t interface { + mock.TestingT + Cleanup(func()) +}) *Store { + mock := &Store{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/pkg/indexer/engine/processor.go b/pkg/indexer/engine/processor.go new file mode 100644 index 00000000..ee764935 --- /dev/null +++ b/pkg/indexer/engine/processor.go @@ -0,0 +1,270 @@ +package engine + +import ( + "context" + "fmt" + "time" + + "github.com/chainsafe/canton-middleware/pkg/cantonsdk/streaming" + "github.com/chainsafe/canton-middleware/pkg/indexer" + + "go.uber.org/zap" +) + +var ( + processorRetryBaseDelay = 5 * time.Second + processorRetryMaxDelay = 60 * time.Second +) + +// EventFetcher is the interface the Processor uses to start and consume the ledger stream. +// +//go:generate mockery --name EventFetcher --output mocks --outpkg mocks --filename mock_event_fetcher.go --with-expecter +type EventFetcher interface { + // Start begins streaming from offset in a background goroutine. + // Must be called exactly once before Events is used. + Start(ctx context.Context, offset int64) + + // Events returns the read-only channel of decoded batches. + // The channel is closed when the stream terminates. + Events() <-chan *streaming.Batch[*indexer.ParsedEvent] +} + +// Store defines the persistence contract for the indexer Processor. +// +// The key invariant: offset and events from the same LedgerTransaction must be +// written atomically. This guarantees that after a restart the processor resumes +// from a consistent point — no event is lost and no event is double-written. +// +//go:generate mockery --name Store --output mocks --outpkg mocks --filename mock_store.go --with-expecter +type Store interface { + // LatestOffset returns the last successfully persisted ledger offset. + // Returns 0 and no error when no offset has been stored yet (fresh start). + // Called once at startup, outside any transaction. + LatestOffset(ctx context.Context) (int64, error) + + // RunInTx executes fn inside a single database transaction. + // On success fn's return value is nil and the transaction is committed. + // On any error the transaction is rolled back and the error is returned. + // The Store passed to fn is scoped to the transaction — all methods on it + // participate in the same underlying DB transaction. + RunInTx(ctx context.Context, fn func(ctx context.Context, tx Store) error) error + + // InsertEvent persists one ParsedEvent by ContractID. + // Returns inserted=false when the event already exists and should therefore not + // mutate any derived state a second time. + InsertEvent(ctx context.Context, event *indexer.ParsedEvent) (inserted bool, err error) + + // SaveOffset advances the stored ledger offset after all newly inserted events in + // the transaction have updated derived state. It must be safe to call even when the + // batch was empty or every event was already present. + SaveOffset(ctx context.Context, offset int64) error + + // UpsertToken records a token deployment on first observation. + // Subsequent calls for the same {InstrumentAdmin, InstrumentID} are no-ops + // (ON CONFLICT DO NOTHING). + UpsertToken(ctx context.Context, token *indexer.Token) error + + // ApplyBalanceDelta adjusts a party's token balance by delta (signed decimal string). + // The balance row is created at zero if it does not yet exist, then delta is added. + // The store must also update Token.HolderCount atomically: + // - increment when a party's balance transitions from zero to positive + // - decrement when a party's balance transitions from positive to zero + ApplyBalanceDelta(ctx context.Context, partyID, instrumentAdmin, instrumentID, delta string) error + + // ApplySupplyDelta adjusts a token's TotalSupply by delta (signed decimal string). + // Called once per mint (+amount) or burn (-amount). Transfer events must not call this. + ApplySupplyDelta(ctx context.Context, instrumentAdmin, instrumentID, delta string) error +} + +// Processor is the main run loop of the indexer. It wires the EventFetcher to the +// Store and writes decoded events atomically. +// +// Processing is sequential — one batch at a time. The ordering guarantee comes from +// the Canton ledger: transactions within a party's projection are delivered in +// strictly increasing offset order. +type Processor struct { + fetcher EventFetcher + store Store + logger *zap.Logger +} + +// NewProcessor creates a Processor. +func NewProcessor(fetcher EventFetcher, store Store, logger *zap.Logger) *Processor { + return &Processor{ + fetcher: fetcher, + store: store, + logger: logger, + } +} + +// Run starts the indexer loop. It blocks until ctx is canceled or the fetcher +// channel closes, then returns ctx.Err() or nil respectively. +// +// On startup Run loads the resume offset from the store and passes it to the fetcher, +// so callers do not need to track offsets themselves. +// +// If processBatch fails (store error) Run retries the same batch with exponential +// backoff (5s → 60s) until it succeeds or ctx is canceled. The offset is never +// advanced past a failed batch — no event is silently dropped. +func (p *Processor) Run(ctx context.Context) error { + offset, err := p.store.LatestOffset(ctx) + if err != nil { + return fmt.Errorf("load resume offset: %w", err) + } + + p.logger.Info("indexer processor starting", zap.Int64("resume_offset", offset)) + p.fetcher.Start(ctx, offset) + + for { + select { + case batch, ok := <-p.fetcher.Events(): + if !ok { + p.logger.Info("indexer stream closed") + return nil + } + if err := p.processBatchWithRetry(ctx, batch); err != nil { + // Only reachable when ctx is canceled. + return err + } + case <-ctx.Done(): + return ctx.Err() + } + } +} + +// processBatchWithRetry calls processBatch and retries with exponential backoff on failure. +// It returns only when the batch is successfully persisted or ctx is canceled. +func (p *Processor) processBatchWithRetry(ctx context.Context, batch *streaming.Batch[*indexer.ParsedEvent]) error { + delay := processorRetryBaseDelay + + for { + err := p.processBatch(ctx, batch) + if err == nil { + return nil + } + + p.logger.Error("failed to process batch, retrying", + zap.String("update_id", batch.UpdateID), + zap.Int64("offset", batch.Offset), + zap.Duration("backoff", delay), + zap.Error(err), + ) + + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(delay): + } + + delay = min(delay*2, processorRetryMaxDelay) + } +} + +// processBatch persists a single decoded batch inside a single database transaction. +// Each event is inserted before its derived state is mutated so replayed transactions +// can skip already-indexed events without double-applying balances or supply. +// All writes — event inserts, token upserts, supply/balance deltas, and offset advance — +// are committed atomically. On any error the transaction is rolled back and the caller retries. +func (p *Processor) processBatch(ctx context.Context, batch *streaming.Batch[*indexer.ParsedEvent]) error { + err := p.store.RunInTx(ctx, func(ctx context.Context, tx Store) error { + for _, e := range batch.Items { + inserted, err := tx.InsertEvent(ctx, e) + if err != nil { + return fmt.Errorf("insert event: %w", err) + } + if !inserted { + continue + } + + if err := tx.UpsertToken(ctx, tokenFromEvent(e)); err != nil { + return fmt.Errorf("upsert token: %w", err) + } + + if admin, id, delta, ok := supplyDeltaFromEvent(e); ok { + if err := tx.ApplySupplyDelta(ctx, admin, id, delta); err != nil { + return fmt.Errorf("apply supply delta: %w", err) + } + } + + for _, u := range balanceUpdatesFromEvent(e) { + if err := tx.ApplyBalanceDelta(ctx, u[0], e.InstrumentAdmin, e.InstrumentID, u[1]); err != nil { + return fmt.Errorf("apply balance delta: %w", err) + } + } + } + + if err := tx.SaveOffset(ctx, batch.Offset); err != nil { + return fmt.Errorf("save offset: %w", err) + } + + return nil + }) + if err != nil { + return fmt.Errorf("tx at offset %d: %w", batch.Offset, err) + } + + if len(batch.Items) > 0 { + p.logger.Debug("indexed batch", + zap.String("update_id", batch.UpdateID), + zap.Int64("offset", batch.Offset), + zap.Int("events", len(batch.Items)), + ) + } + + return nil +} + +// tokenFromEvent constructs a Token from a ParsedEvent for UpsertToken. +// TotalSupply and HolderCount are left zero — the store initializes them on first +// insert and maintains them via ApplySupplyDelta / UpsertBalance thereafter. +func tokenFromEvent(e *indexer.ParsedEvent) *indexer.Token { + return &indexer.Token{ + InstrumentAdmin: e.InstrumentAdmin, + InstrumentID: e.InstrumentID, + Issuer: e.Issuer, + FirstSeenOffset: e.LedgerOffset, + FirstSeenAt: e.EffectiveTime, + } +} + +// supplyDeltaFromEvent returns the signed supply delta for MINT (+amount) and +// BURN (-amount). Returns ok=false for TRANSFER, which leaves total supply unchanged. +func supplyDeltaFromEvent(e *indexer.ParsedEvent) (instrumentAdmin, instrumentID, delta string, ok bool) { + switch e.EventType { + case indexer.EventMint: + return e.InstrumentAdmin, e.InstrumentID, e.Amount, true + case indexer.EventBurn: + return e.InstrumentAdmin, e.InstrumentID, "-" + e.Amount, true + default: + return "", "", "", false + } +} + +// balanceUpdatesFromEvent returns [partyID, signedDelta] pairs for each balance +// affected by an event. Mirrors supplyDeltaFromEvent but at the per-party level. +// +// MINT: toParty +amount +// BURN: fromParty −amount +// TRANSFER: fromParty −amount, toParty +amount +func balanceUpdatesFromEvent(e *indexer.ParsedEvent) [][2]string { + neg := "-" + e.Amount + switch e.EventType { + case indexer.EventMint: + if e.ToPartyID == nil { + return nil + } + return [][2]string{{*e.ToPartyID, e.Amount}} + case indexer.EventBurn: + if e.FromPartyID == nil { + return nil + } + return [][2]string{{*e.FromPartyID, neg}} + case indexer.EventTransfer: + if e.FromPartyID == nil || e.ToPartyID == nil { + return nil + } + return [][2]string{{*e.FromPartyID, neg}, {*e.ToPartyID, e.Amount}} + default: + return nil + } +} diff --git a/pkg/indexer/engine/processor_test.go b/pkg/indexer/engine/processor_test.go new file mode 100644 index 00000000..6d4af7e7 --- /dev/null +++ b/pkg/indexer/engine/processor_test.go @@ -0,0 +1,306 @@ +package engine_test + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/chainsafe/canton-middleware/pkg/indexer/engine" + "github.com/chainsafe/canton-middleware/pkg/indexer/engine/mocks" + + "github.com/chainsafe/canton-middleware/pkg/cantonsdk/streaming" + "github.com/chainsafe/canton-middleware/pkg/indexer" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "go.uber.org/zap" +) + +const ( + testInstrumentID = "DEMO" + testInstrumentAdmin = "issuer-party::abc123" + testIssuer = "issuer-party::abc123" + testAmount = "100.000000000000000000" + testRecipient = "recipient-party::def456" + testSender = "sender-party::ghi789" + testContractID = "contract-id-1" +) + +// --------------------------------------------------------------------------- +// Builders +// --------------------------------------------------------------------------- + +func mintEvent() *indexer.ParsedEvent { + r := testRecipient + return &indexer.ParsedEvent{ + EventType: indexer.EventMint, + InstrumentID: testInstrumentID, + InstrumentAdmin: testInstrumentAdmin, + Issuer: testIssuer, + Amount: testAmount, + ToPartyID: &r, + ContractID: testContractID, + LedgerOffset: 1, + EffectiveTime: time.Unix(1_700_000_000, 0), + } +} + +func burnEvent() *indexer.ParsedEvent { + s := testSender + return &indexer.ParsedEvent{ + EventType: indexer.EventBurn, + InstrumentID: testInstrumentID, + InstrumentAdmin: testInstrumentAdmin, + Issuer: testIssuer, + Amount: testAmount, + FromPartyID: &s, + ContractID: testContractID, + LedgerOffset: 2, + EffectiveTime: time.Unix(1_700_000_000, 0), + } +} + +func transferEvent() *indexer.ParsedEvent { + s := testSender + r := testRecipient + return &indexer.ParsedEvent{ + EventType: indexer.EventTransfer, + InstrumentID: testInstrumentID, + InstrumentAdmin: testInstrumentAdmin, + Issuer: testIssuer, + Amount: testAmount, + FromPartyID: &s, + ToPartyID: &r, + ContractID: testContractID, + LedgerOffset: 3, + EffectiveTime: time.Unix(1_700_000_000, 0), + } +} + +func makeBatch(offset int64, events ...*indexer.ParsedEvent) *streaming.Batch[*indexer.ParsedEvent] { + return &streaming.Batch[*indexer.ParsedEvent]{ + Offset: offset, + UpdateID: "update-" + string(rune('0'+offset)), + Items: events, + } +} + +func feedCh(batches ...*streaming.Batch[*indexer.ParsedEvent]) <-chan *streaming.Batch[*indexer.ParsedEvent] { + ch := make(chan *streaming.Batch[*indexer.ParsedEvent], len(batches)) + for _, b := range batches { + ch <- b + } + close(ch) + return ch +} + +func setupRunInTx(store *mocks.Store) { + store.EXPECT().RunInTx(mock.Anything, mock.Anything). + RunAndReturn(func(ctx context.Context, fn func(context.Context, engine.Store) error) error { + return fn(ctx, store) + }) +} + +// --------------------------------------------------------------------------- +// Lifecycle +// --------------------------------------------------------------------------- + +func TestProcessor_Run_LoadOffsetError(t *testing.T) { + store := mocks.NewStore(t) + fetcher := mocks.NewEventFetcher(t) + loadErr := errors.New("db down") + + store.EXPECT().LatestOffset(mock.Anything).Return(int64(0), loadErr) + + err := engine.NewProcessor(fetcher, store, zap.NewNop()).Run(context.Background()) + require.Error(t, err) + assert.ErrorIs(t, err, loadErr) +} + +func TestProcessor_Run_StreamClosed_ReturnsNil(t *testing.T) { + store := mocks.NewStore(t) + fetcher := mocks.NewEventFetcher(t) + + store.EXPECT().LatestOffset(mock.Anything).Return(int64(5), nil) + fetcher.EXPECT().Start(mock.Anything, int64(5)) + fetcher.EXPECT().Events().Return(feedCh()) + + assert.NoError(t, engine.NewProcessor(fetcher, store, zap.NewNop()).Run(context.Background())) +} + +func TestProcessor_Run_ContextCancelled(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + store := mocks.NewStore(t) + fetcher := mocks.NewEventFetcher(t) + + ch := make(chan *streaming.Batch[*indexer.ParsedEvent]) + + store.EXPECT().LatestOffset(mock.Anything).Return(int64(0), nil) + fetcher.EXPECT().Start(mock.Anything, int64(0)) + fetcher.EXPECT().Events().Return((<-chan *streaming.Batch[*indexer.ParsedEvent])(ch)) + + done := make(chan error, 1) + go func() { done <- engine.NewProcessor(fetcher, store, zap.NewNop()).Run(ctx) }() + + cancel() + assert.ErrorIs(t, <-done, context.Canceled) +} + +// --------------------------------------------------------------------------- +// Event-type store call verification +// (also implicitly tests tokenFromEvent / supplyDeltaFromEvent / balanceUpdatesFromEvent) +// --------------------------------------------------------------------------- + +func TestProcessor_Run_MintBatch(t *testing.T) { + store := mocks.NewStore(t) + fetcher := mocks.NewEventFetcher(t) + ev := mintEvent() + + store.EXPECT().LatestOffset(mock.Anything).Return(int64(0), nil) + fetcher.EXPECT().Start(mock.Anything, int64(0)) + fetcher.EXPECT().Events().Return(feedCh(makeBatch(1, ev))) + + setupRunInTx(store) + store.EXPECT().InsertEvent(mock.Anything, ev).Return(true, nil) + store.EXPECT().UpsertToken(mock.Anything, &indexer.Token{ + InstrumentAdmin: testInstrumentAdmin, + InstrumentID: testInstrumentID, + Issuer: testIssuer, + FirstSeenOffset: 1, + FirstSeenAt: time.Unix(1_700_000_000, 0), + }).Return(nil) + store.EXPECT().ApplySupplyDelta(mock.Anything, testInstrumentAdmin, testInstrumentID, testAmount).Return(nil) + store.EXPECT().ApplyBalanceDelta(mock.Anything, testRecipient, testInstrumentAdmin, testInstrumentID, testAmount).Return(nil) + store.EXPECT().SaveOffset(mock.Anything, int64(1)).Return(nil) + + require.NoError(t, engine.NewProcessor(fetcher, store, zap.NewNop()).Run(context.Background())) +} + +func TestProcessor_Run_BurnBatch(t *testing.T) { + store := mocks.NewStore(t) + fetcher := mocks.NewEventFetcher(t) + ev := burnEvent() + + store.EXPECT().LatestOffset(mock.Anything).Return(int64(0), nil) + fetcher.EXPECT().Start(mock.Anything, int64(0)) + fetcher.EXPECT().Events().Return(feedCh(makeBatch(2, ev))) + + setupRunInTx(store) + store.EXPECT().InsertEvent(mock.Anything, ev).Return(true, nil) + store.EXPECT().UpsertToken(mock.Anything, &indexer.Token{ + InstrumentAdmin: testInstrumentAdmin, + InstrumentID: testInstrumentID, + Issuer: testIssuer, + FirstSeenOffset: 2, + FirstSeenAt: time.Unix(1_700_000_000, 0), + }).Return(nil) + store.EXPECT().ApplySupplyDelta(mock.Anything, testInstrumentAdmin, testInstrumentID, "-"+testAmount).Return(nil) + store.EXPECT().ApplyBalanceDelta(mock.Anything, testSender, testInstrumentAdmin, testInstrumentID, "-"+testAmount).Return(nil) + store.EXPECT().SaveOffset(mock.Anything, int64(2)).Return(nil) + + require.NoError(t, engine.NewProcessor(fetcher, store, zap.NewNop()).Run(context.Background())) +} + +func TestProcessor_Run_TransferBatch(t *testing.T) { + store := mocks.NewStore(t) + fetcher := mocks.NewEventFetcher(t) + ev := transferEvent() + + store.EXPECT().LatestOffset(mock.Anything).Return(int64(0), nil) + fetcher.EXPECT().Start(mock.Anything, int64(0)) + fetcher.EXPECT().Events().Return(feedCh(makeBatch(3, ev))) + + setupRunInTx(store) + store.EXPECT().InsertEvent(mock.Anything, ev).Return(true, nil) + store.EXPECT().UpsertToken(mock.Anything, &indexer.Token{ + InstrumentAdmin: testInstrumentAdmin, + InstrumentID: testInstrumentID, + Issuer: testIssuer, + FirstSeenOffset: 3, + FirstSeenAt: time.Unix(1_700_000_000, 0), + }).Return(nil) + // Transfer: no ApplySupplyDelta. + store.EXPECT().ApplyBalanceDelta(mock.Anything, testSender, testInstrumentAdmin, testInstrumentID, "-"+testAmount).Return(nil) + store.EXPECT().ApplyBalanceDelta(mock.Anything, testRecipient, testInstrumentAdmin, testInstrumentID, testAmount).Return(nil) + store.EXPECT().SaveOffset(mock.Anything, int64(3)).Return(nil) + + require.NoError(t, engine.NewProcessor(fetcher, store, zap.NewNop()).Run(context.Background())) +} + +func TestProcessor_Run_EmptyBatch_AdvancesOffset(t *testing.T) { + store := mocks.NewStore(t) + fetcher := mocks.NewEventFetcher(t) + + store.EXPECT().LatestOffset(mock.Anything).Return(int64(9), nil) + fetcher.EXPECT().Start(mock.Anything, int64(9)) + fetcher.EXPECT().Events().Return(feedCh(makeBatch(10))) + + setupRunInTx(store) + store.EXPECT().SaveOffset(mock.Anything, int64(10)).Return(nil) + + require.NoError(t, engine.NewProcessor(fetcher, store, zap.NewNop()).Run(context.Background())) +} + +func TestProcessor_Run_DuplicateEvent_SkipsDerivedStateButAdvancesOffset(t *testing.T) { + store := mocks.NewStore(t) + fetcher := mocks.NewEventFetcher(t) + ev := mintEvent() + + store.EXPECT().LatestOffset(mock.Anything).Return(int64(4), nil) + fetcher.EXPECT().Start(mock.Anything, int64(4)) + fetcher.EXPECT().Events().Return(feedCh(makeBatch(5, ev))) + + setupRunInTx(store) + store.EXPECT().InsertEvent(mock.Anything, ev).Return(false, nil) + store.EXPECT().SaveOffset(mock.Anything, int64(5)).Return(nil) + + require.NoError(t, engine.NewProcessor(fetcher, store, zap.NewNop()).Run(context.Background())) +} + +// --------------------------------------------------------------------------- +// Retry behavior +// --------------------------------------------------------------------------- + +func TestProcessor_Run_StoreError_Retries(t *testing.T) { + engine.SetRetryBaseDelay(t, time.Millisecond) + + store := mocks.NewStore(t) + fetcher := mocks.NewEventFetcher(t) + + store.EXPECT().LatestOffset(mock.Anything).Return(int64(0), nil) + fetcher.EXPECT().Start(mock.Anything, int64(0)) + fetcher.EXPECT().Events().Return(feedCh(makeBatch(1))) + + store.EXPECT().RunInTx(mock.Anything, mock.Anything). + Return(errors.New("transient error")).Once() + store.EXPECT().RunInTx(mock.Anything, mock.Anything). + RunAndReturn(func(ctx context.Context, fn func(context.Context, engine.Store) error) error { + return fn(ctx, store) + }).Once() + store.EXPECT().SaveOffset(mock.Anything, int64(1)).Return(nil) + + require.NoError(t, engine.NewProcessor(fetcher, store, zap.NewNop()).Run(context.Background())) +} + +func TestProcessor_Run_ContextCancelledDuringRetry(t *testing.T) { + engine.SetRetryBaseDelay(t, time.Hour) + + ctx, cancel := context.WithCancel(context.Background()) + store := mocks.NewStore(t) + fetcher := mocks.NewEventFetcher(t) + + store.EXPECT().LatestOffset(mock.Anything).Return(int64(0), nil) + fetcher.EXPECT().Start(mock.Anything, int64(0)) + fetcher.EXPECT().Events().Return(feedCh(makeBatch(1))) + + store.EXPECT().RunInTx(mock.Anything, mock.Anything). + RunAndReturn(func(_ context.Context, _ func(context.Context, engine.Store) error) error { + cancel() + return errors.New("persistent error") + }) + + err := engine.NewProcessor(fetcher, store, zap.NewNop()).Run(ctx) + assert.ErrorIs(t, err, context.Canceled) +} diff --git a/pkg/indexer/types.go b/pkg/indexer/types.go new file mode 100644 index 00000000..33ac187b --- /dev/null +++ b/pkg/indexer/types.go @@ -0,0 +1,133 @@ +package indexer + +import "time" + +// EventType classifies a TokenTransferEvent as MINT, BURN, or TRANSFER. +// Derived from the fromParty/toParty Optional fields — mirrors ERC-20 Transfer semantics: +// +// MINT: fromParty = None, toParty = Some(recipient) +// BURN: fromParty = Some(owner), toParty = None +// TRANSFER: fromParty = Some(sender), toParty = Some(receiver) +type EventType string + +const ( + EventMint EventType = "MINT" + EventBurn EventType = "BURN" + EventTransfer EventType = "TRANSFER" +) + +// ParsedEvent is a fully decoded TokenTransferEvent ready for the processor. +// +// Fields map directly to the DAML TokenTransferEvent template in CIP56.Events: +// +// issuer → Issuer +// instrumentId → InstrumentID (id field) + InstrumentAdmin (admin field) +// fromParty → FromPartyID (*string, nil for mints) +// toParty → ToPartyID (*string, nil for burns) +// amount → Amount (decimal string) +// timestamp → Timestamp (contract-level time, from the DAML event) +// meta.values → ExternalTxID, ExternalAddress, Fingerprint (bridge context, nil for transfers) +// +// ContractID (the TokenTransferEvent contract ID) is the idempotency key used +// as event_id in the store — guaranteed unique across the ledger. +// +// Primary identity throughout is canton_party_id — no EVM address at this layer. +type ParsedEvent struct { + // Instrument identification — fully qualified by both fields. + InstrumentID string // instrumentId.id — token identifier (e.g. "DEMO", "PROMPT") + InstrumentAdmin string // instrumentId.admin — token admin/issuer party + + // Issuer of the TokenTransferEvent contract (the token config issuer). + Issuer string + + // Transfer semantics, mirroring ERC-20 Transfer(from, to, value). + EventType EventType + Amount string // decimal string, e.g. "1.500000000000000000" + FromPartyID *string // nil for mints + ToPartyID *string // nil for burns + + // Bridge audit context extracted from meta.values (nil for native peer-to-peer transfers). + ExternalTxID *string // meta["bridge.externalTxId"] — EVM transaction hash + ExternalAddress *string // meta["bridge.externalAddress"] — EVM destination address + Fingerprint *string // meta["bridge.fingerprint"] — user fingerprint + + // Provenance. + ContractID string // TokenTransferEvent contract ID — idempotency key (event_id in store) + TxID string // Ledger transaction UpdateId + LedgerOffset int64 // Ledger offset of the containing transaction + Timestamp time.Time // Contract-level time from TokenTransferEvent.timestamp + EffectiveTime time.Time // Ledger transaction effective time +} + +// InstrumentKey is the Canton equivalent of an ERC-20 contract address. +// It uniquely identifies a CIP56 token deployment. +// Corresponds to the DAML InstrumentId{admin: Party, id: Text} record. +// +// instrumentId.id alone is NOT unique — two different issuers can both deploy +// a token with id="DEMO". The full {Admin, ID} pair IS unique and is the correct +// key for whitelisting specific token deployments. +type InstrumentKey struct { + Admin string // instrumentId.admin — the token admin/issuer party + ID string // instrumentId.id — the token identifier (e.g. "DEMO") +} + +// Token represents a CIP56 token deployment, uniquely identified by {InstrumentAdmin, InstrumentID}. +// A Token record is created the first time the indexer observes a TokenTransferEvent for a given +// instrument pair. It tracks the ERC-20-equivalent on-chain state derivable from transfer events. +// +// ERC-20 parallel: +// +// symbol() → InstrumentID +// owner/minter → InstrumentAdmin, Issuer +// totalSupply() → TotalSupply (maintained: +amount on MINT, -amount on BURN) +// HolderCount (non-standard but shown on all block explorers) +type Token struct { + // Identity — canonical composite key. + InstrumentAdmin string // instrumentId.admin — token admin/issuer party (ERC-20: deployer) + InstrumentID string // instrumentId.id — token symbol/identifier (ERC-20: symbol, e.g. "DEMO") + + // Roles. + Issuer string // issuer party on the TokenTransferEvent contract (ERC-20: minter role) + + // Supply (ERC-20: totalSupply()). + // Running total, always ≥ 0. Incremented by each MINT amount, decremented by each BURN amount. + // Updated atomically with every mint/burn via Store.ApplySupplyDelta. + TotalSupply string // decimal string, e.g. "1000000.000000000000000000" + + // Holders (ERC-20: no standard equivalent, but a standard block-explorer metric). + // Count of distinct parties currently holding a non-zero balance. + // The store increments this when a balance first becomes positive, decrements when it returns to zero. + HolderCount int64 + + // Provenance. + FirstSeenOffset int64 // ledger offset when this token was first indexed + FirstSeenAt time.Time // ledger effective time when this token was first indexed +} + +// Balance is a party's current token holding for a specific instrument. +// (ERC-20: the per-address entry in the balances mapping, i.e. balanceOf(address).) +// +// Amount is a non-negative decimal string representing the live balance, +// e.g. "1500.000000000000000000". Updated by the store via delta arithmetic +// (Store.ApplyBalanceDelta) — the store adds the signed delta to the persisted value. +type Balance struct { + PartyID string // canton party (ERC-20: address) + InstrumentAdmin string // instrumentId.admin + InstrumentID string // instrumentId.id + Amount string // current balance, decimal string ≥ 0 +} + +// FilterMode controls which token instruments the Parser processes. +type FilterMode int + +const ( + // FilterModeAll indexes events from every instrument — equivalent to a global + // ERC-20 Transfer log covering all CIP56 token deployments visible to the indexer. + FilterModeAll FilterMode = iota + + // FilterModeWhitelist indexes only events whose InstrumentKey{Admin, ID} is in + // the allowed set. Use this for an operator who manages a fixed set of tokens. + // Both Admin and ID must match — this is the Canton equivalent of whitelisting + // by ERC-20 contract address. + FilterModeWhitelist +)