Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 19 additions & 3 deletions pkg/cardinal/cardinal.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/argus-labs/world-engine/pkg/cardinal/internal/command"
"github.com/argus-labs/world-engine/pkg/cardinal/internal/ecs"
"github.com/argus-labs/world-engine/pkg/cardinal/internal/event"
"github.com/argus-labs/world-engine/pkg/cardinal/internal/performance"
"github.com/argus-labs/world-engine/pkg/cardinal/snapshot"
"github.com/argus-labs/world-engine/pkg/micro"
"github.com/argus-labs/world-engine/pkg/telemetry"
Expand Down Expand Up @@ -108,9 +109,22 @@ func NewWorld(opts WorldOptions) (*World, error) {

// Create the debug module only if debug is on.
if *options.Debug {
debug := newDebugModule(world)
debug, err := newDebugModule(world)
if err != nil {
return nil, eris.Wrap(err, "failed to create debug module")
}
debug.control.isPaused.Store(true)
world.debug = &debug

world.world.SetOnSystemRun(func(phase, name string, startOffsetNs, durationNs int64) {
world.debug.recordSpan(performance.TickSpan{
TickHeight: world.currentTick.height,
Phase: phase,
SystemName: name,
StartOffsetNs: startOffsetNs,
DurationNs: durationNs,
})
})
}

return world, nil
Expand Down Expand Up @@ -193,13 +207,15 @@ func (w *World) Tick(ctx context.Context, timestamp time.Time) error {
_ = w.commands.Drain()

w.currentTick.timestamp = timestamp
w.debug.startPerfTick(w.currentTick.height, timestamp)

// Tick ECS world.
err := w.world.Tick()
err := w.world.Tick(timestamp)
if err != nil {
return eris.Wrap(err, "one or more systems failed")
}

w.debug.recordTick(w.currentTick.height, timestamp)

// Emit events.
if err := w.events.Dispatch(); err != nil {
w.tel.Logger.Warn().Err(err).Msg("errors encountered dispatching events")
Expand Down
111 changes: 109 additions & 2 deletions pkg/cardinal/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cardinal

import (
"context"
"math"
"net/http"
"sync/atomic"
"time"
Expand All @@ -14,6 +15,7 @@ import (
"google.golang.org/protobuf/types/known/structpb"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/argus-labs/world-engine/pkg/cardinal/internal/performance"
"github.com/argus-labs/world-engine/pkg/cardinal/internal/schema"
cardinalv1 "github.com/argus-labs/world-engine/proto/gen/go/worldengine/cardinal/v1"
"github.com/argus-labs/world-engine/proto/gen/go/worldengine/cardinal/v1/cardinalv1connect"
Expand All @@ -28,11 +30,16 @@ type debugModule struct {
commands map[string]*structpb.Struct
events map[string]*structpb.Struct
components map[string]*structpb.Struct
perf *performance.Collector
}

var _ cardinalv1connect.DebugServiceHandler = (*debugModule)(nil)

func newDebugModule(world *World) debugModule {
func newDebugModule(world *World) (debugModule, error) {
perf, err := performance.NewCollector(world.options.TickRate)
if err != nil {
return debugModule{}, eris.Wrap(err, "failed to create performance collector")
}
return debugModule{
world: world,
control: newTickControl(),
Expand All @@ -43,7 +50,8 @@ func newDebugModule(world *World) debugModule {
Anonymous: true, // Don't add $id based on package path
ExpandedStruct: true, // Inline the struct fields directly
},
}
perf: perf,
}, nil
}

// Init initializes and starts the connect server for the debug service.
Expand Down Expand Up @@ -141,6 +149,105 @@ func (d *debugModule) Introspect(
}), nil
}

// PerfOverview returns aggregated tick timing statistics over a time window.
func (d *debugModule) PerfOverview(
_ context.Context,
req *connect.Request[cardinalv1.PerfOverviewRequest],
) (*connect.Response[cardinalv1.PerfOverviewResponse], error) {
now := time.Now()
windowSeconds := int(req.Msg.GetWindowSeconds())
stats, freshness := d.perf.Overview(windowSeconds, now)

return connect.NewResponse(&cardinalv1.PerfOverviewResponse{
TickRateHz: uint32(d.world.options.TickRate),
TickBudgetMs: 1000.0 / d.world.options.TickRate,
WindowSeconds: req.Msg.GetWindowSeconds(),
Samples: &cardinalv1.TickSampleStats{
Count: int32(min(stats.Count, math.MaxInt32)), //nolint:gosec // bounded by ring capacity
AvgMs: stats.AvgMs,
P95Ms: stats.P95Ms,
MaxMs: stats.MaxMs,
OverrunCount: int32(min(stats.OverrunCount, math.MaxInt32)), //nolint:gosec // bounded by Count
OverrunRate: stats.OverrunRate,
},
Freshness: &cardinalv1.TickFreshness{
LastTickHeight: freshness.LastTickHeight,
LastTickAt: timestamppb.New(freshness.LastTickAt),
AgeMs: freshness.AgeMs,
},
}), nil
}

// PerfSchedule returns per-system span timelines for recent ticks.
func (d *debugModule) PerfSchedule(
_ context.Context,
req *connect.Request[cardinalv1.PerfScheduleRequest],
) (*connect.Response[cardinalv1.PerfScheduleResponse], error) {
now := time.Now()
windowSeconds := int(req.Msg.GetWindowSeconds())
tickSpans := d.perf.Schedule(windowSeconds, now)

ticks := make([]*cardinalv1.TickTimeline, 0, len(tickSpans))
for _, ts := range tickSpans {
spans := make([]*cardinalv1.SystemSpan, 0, len(ts.Spans))
for _, span := range ts.Spans {
spans = append(spans, &cardinalv1.SystemSpan{
Phase: span.Phase,
System: span.SystemName,
StartOffsetNs: span.StartOffsetNs,
DurationNs: span.DurationNs,
})
}
ticks = append(ticks, &cardinalv1.TickTimeline{
TickHeight: ts.TickHeight,
TickStart: timestamppb.New(ts.TickStart),
Spans: spans,
})
}

return connect.NewResponse(&cardinalv1.PerfScheduleResponse{
TickRateHz: uint32(d.world.options.TickRate),
TickBudgetMs: 1000.0 / d.world.options.TickRate,
WindowSeconds: req.Msg.GetWindowSeconds(),
Ticks: ticks,
}), nil
}

// recordTick computes tick duration and records the sample. Nil-safe.
func (d *debugModule) recordTick(tickHeight uint64, tickStart time.Time) {
if d == nil {
return
}
completedAt := time.Now()
durationMs := completedAt.Sub(tickStart).Seconds() * 1000
d.perf.RecordTick(performance.TickSample{
At: completedAt,
TickHeight: tickHeight,
DurationMs: durationMs,
Overrun: durationMs > 1000.0/d.world.options.TickRate,
})
}

// startPerfTick initializes span storage for a new tick. Nil-safe.
func (d *debugModule) startPerfTick(tickHeight uint64, tickStart time.Time) {
if d == nil {
return
}
if err := d.perf.StartTick(tickHeight, tickStart); err != nil {
logger := d.world.tel.GetLogger("debug")
logger.Debug().Err(err).Uint64("tick_height", tickHeight).
Msg("perf StartTick out of sequence, skipping span data for this tick")
}
}

// recordSpan records a per-system span. Nil-safe.
func (d *debugModule) recordSpan(span performance.TickSpan) {
if d == nil {
return
}
d.perf.RecordSpan(span)
}

// buildTypeSchemas converts the internal schema cache to proto TypeSchema messages.
func (d *debugModule) buildTypeSchemas(cache map[string]*structpb.Struct) []*cardinalv1.TypeSchema {
schemas := make([]*cardinalv1.TypeSchema, 0, len(cache))
Expand Down
13 changes: 7 additions & 6 deletions pkg/cardinal/internal/ecs/bench_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ecs

import (
"testing"
"time"

"github.com/kelindar/bitmap"
)
Expand Down Expand Up @@ -529,8 +530,8 @@ func BenchmarkECS2_Iteration_GetSet(b *testing.B) {

w.Init()

_ = w.Tick()
_ = w.Tick()
_ = w.Tick(time.Now())
_ = w.Tick(time.Now())
}
})

Expand Down Expand Up @@ -565,8 +566,8 @@ func BenchmarkECS2_Iteration_GetSet(b *testing.B) {

w.Init()

_ = w.Tick()
_ = w.Tick()
_ = w.Tick(time.Now())
_ = w.Tick(time.Now())
}
})

Expand Down Expand Up @@ -617,8 +618,8 @@ func BenchmarkECS2_Iteration_GetSet(b *testing.B) {

w.Init()

_ = w.Tick()
_ = w.Tick()
_ = w.Tick(time.Now())
_ = w.Tick(time.Now())
}
})
}
Expand Down
19 changes: 17 additions & 2 deletions pkg/cardinal/internal/ecs/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package ecs
import (
"sync"
"sync/atomic"
"time"

"slices"

Expand All @@ -20,6 +21,7 @@ type systemMetadata struct {
// It orders systems based on their component and system event dependencies and is optimized to
// maximize parallelism while maintaining correct order.
type systemScheduler struct {
phase string // The execution phase ("pre", "update", "post")
systems []systemMetadata // The systems to run
tier0 []int // The first execution tier
graph map[int][]int // Mapping of systems -> systems that depend on it
Expand All @@ -28,6 +30,11 @@ type systemScheduler struct {
// for each system. They alternate between runs to avoid reinitialization.
indegree0 []atomic.Int32
indegree1 []atomic.Int32
// onSystemRun is an optional callback for debug performance metrics.
// When set, it is called after each system execution with the phase, system name,
// start offset from tick start (ns), and execution duration (ns).
// When nil, system execution has zero instrumentation overhead.
onSystemRun func(phase, name string, startOffsetNs, durationNs int64)
}

// newSystemScheduler creates a new system scheduler.
Expand All @@ -47,7 +54,7 @@ func (s *systemScheduler) register(name string, systemDep bitmap.Bitmap, systemF

// Run executes the systems in the order of their dependencies. It returns an error if any system
// returns an error. If multiple systems fail, all errors are wrapped in a single error.
func (s *systemScheduler) Run() {
func (s *systemScheduler) Run(tickStart time.Time) {
// Fast path: no systems in hook.
if len(s.systems) == 0 {
return
Expand All @@ -68,7 +75,15 @@ func (s *systemScheduler) Run() {
for range s.systems {
systemID := <-executionQueue
wg.Go(func() {
s.systems[systemID].fn()
if s.onSystemRun != nil { // Set by the debug module to collect performance metrics.
startOffset := time.Since(tickStart).Nanoseconds()
start := time.Now()
s.systems[systemID].fn()
duration := time.Since(start).Nanoseconds()
s.onSystemRun(s.phase, s.systems[systemID].name, startOffset, duration)
} else {
s.systems[systemID].fn()
}

// Process all systems that depend on this one.
for _, dependent := range s.graph[systemID] {
Expand Down
2 changes: 1 addition & 1 deletion pkg/cardinal/internal/ecs/scheduler_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func TestScheduler_RunFuzzConcurrent(t *testing.T) {
events[i] = struct{ start, end int64 }{}
}

scheduler.Run()
scheduler.Run(time.Now())

// Property: All systems execute exactly once.
for i, ev := range events {
Expand Down
16 changes: 14 additions & 2 deletions pkg/cardinal/internal/ecs/world.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package ecs

import (
"time"

cardinalv1 "github.com/argus-labs/world-engine/proto/gen/go/worldengine/cardinal/v1"
)

Expand All @@ -24,8 +26,10 @@ func NewWorld() *World {
systemEvents: newSystemEventManager(),
}

phaseNames := [3]string{"pre", "update", "post"}
for i := range world.scheduler {
world.scheduler[i] = newSystemScheduler()
world.scheduler[i].phase = phaseNames[i]
}

return world
Expand All @@ -42,7 +46,7 @@ func (w *World) Init() {
// registered systems in order. If any system returns an error, the entire tick is considered
// failed, changes are discarded, and the error is returned. If the tick succeeds, the events
// emmitted during the tick is returned.
func (w *World) Tick() error {
func (w *World) Tick(tickStart time.Time) error {
// Run init systems once on first tick.
if !w.initDone {
for _, system := range w.initSystems {
Expand All @@ -57,12 +61,20 @@ func (w *World) Tick() error {

// Run the systems.
for i := range w.scheduler {
w.scheduler[i].Run()
w.scheduler[i].Run(tickStart)
}

return nil
}

// SetOnSystemRun sets a callback invoked after each system execution.
// Must be called before Init.
func (w *World) SetOnSystemRun(fn func(phase, name string, startOffsetNs, durationNs int64)) {
for i := range w.scheduler {
w.scheduler[i].onSystemRun = fn
}
}

func (w *World) OnComponentRegister(callback func(zero Component) error) {
w.onComponentRegister = callback
}
Expand Down
Loading
Loading