Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions pkg/cardinal/cardinal.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/argus-labs/world-engine/pkg/cardinal/internal/command"
"github.com/argus-labs/world-engine/pkg/cardinal/internal/ecs"
"github.com/argus-labs/world-engine/pkg/cardinal/internal/event"
"github.com/argus-labs/world-engine/pkg/cardinal/internal/performance"
"github.com/argus-labs/world-engine/pkg/cardinal/snapshot"
"github.com/argus-labs/world-engine/pkg/micro"
"github.com/argus-labs/world-engine/pkg/telemetry"
Expand Down Expand Up @@ -111,6 +112,16 @@ func NewWorld(opts WorldOptions) (*World, error) {
debug := newDebugModule(world)
debug.control.isPaused.Store(true)
world.debug = &debug

world.world.OnSystemRun(func(name string, hook ecs.SystemHook, startTime, endTime time.Time) {
world.debug.recordSpan(performance.TickSpan{
TickHeight: world.currentTick.height,
SystemName: name,
SystemHook: uint8(hook),
StartTime: startTime,
EndTime: endTime,
})
})
}

return world, nil
Expand Down Expand Up @@ -193,13 +204,16 @@ func (w *World) Tick(ctx context.Context, timestamp time.Time) error {
_ = w.commands.Drain()

w.currentTick.timestamp = timestamp
w.debug.startPerfTick()

// Tick ECS world.
err := w.world.Tick()
if err != nil {
return eris.Wrap(err, "one or more systems failed")
}

w.debug.recordTick(w.currentTick.height, timestamp)

// Emit events.
if err := w.events.Dispatch(); err != nil {
w.tel.Logger.Warn().Err(err).Msg("errors encountered dispatching events")
Expand Down Expand Up @@ -270,6 +284,7 @@ func (w *World) restore(ctx context.Context) error {

// Only update shard state after successful restoration and validation.
w.currentTick.height = snap.TickHeight + 1
w.debug.resetPerf()

return nil
}
Expand Down Expand Up @@ -313,6 +328,7 @@ func (w *World) reset() {
w.events.Clear()
w.currentTick.height = 0
w.currentTick.timestamp = time.Time{}
w.debug.resetPerf()
}

type Tick struct {
Expand Down
6 changes: 3 additions & 3 deletions pkg/cardinal/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (opt *WorldOptions) apply(newOpt WorldOptions) {
if newOpt.ShardID != "" {
opt.ShardID = newOpt.ShardID
}
if newOpt.TickRate != 0.0 {
if newOpt.TickRate > 0.0 {
opt.TickRate = newOpt.TickRate
}
if newOpt.SnapshotStorageType.IsValid() {
Expand Down Expand Up @@ -78,8 +78,8 @@ func (opt *WorldOptions) validate() error {
if opt.ShardID == "" {
return eris.New("shard ID cannot be empty")
}
if opt.TickRate == 0.0 {
return eris.New("tick rate cannot be 0")
if opt.TickRate <= 0.0 {
return eris.New("tick rate must be greater than 0")
}
if !opt.SnapshotStorageType.IsValid() {
return eris.New("snapshot storage type must be specified")
Expand Down
143 changes: 143 additions & 0 deletions pkg/cardinal/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cardinal

import (
"context"
"math"
"net/http"
"sync/atomic"
"time"
Expand All @@ -14,11 +15,14 @@ import (
"google.golang.org/protobuf/types/known/structpb"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/argus-labs/world-engine/pkg/cardinal/internal/performance"
"github.com/argus-labs/world-engine/pkg/cardinal/internal/schema"
cardinalv1 "github.com/argus-labs/world-engine/proto/gen/go/worldengine/cardinal/v1"
"github.com/argus-labs/world-engine/proto/gen/go/worldengine/cardinal/v1/cardinalv1connect"
)

const perfBatchIntervalSec = 1 // Target wall-clock seconds between perf batches.

// debugModule provides introspection and debugging capabilities for a World instance.
type debugModule struct {
world *World
Expand All @@ -28,12 +32,16 @@ type debugModule struct {
commands map[string]*structpb.Struct
events map[string]*structpb.Struct
components map[string]*structpb.Struct
perf *performance.Collector
}

var _ cardinalv1connect.DebugServiceHandler = (*debugModule)(nil)

// newDebugModule creates a new debugModule bound to the given World.
func newDebugModule(world *World) debugModule {
batchSize := max(int(math.Round(world.options.TickRate))*perfBatchIntervalSec, 1)
perf := performance.NewCollector(batchSize)

return debugModule{
world: world,
control: newTickControl(),
Expand All @@ -44,6 +52,7 @@ func newDebugModule(world *World) debugModule {
Anonymous: true, // Don't add $id based on package path
ExpandedStruct: true, // Inline the struct fields directly
},
perf: perf,
}
}

Expand Down Expand Up @@ -140,9 +149,143 @@ func (d *debugModule) Introspect(
Commands: d.buildTypeSchemas(d.commands),
Components: d.buildTypeSchemas(d.components),
Events: d.buildTypeSchemas(d.events),
TickRateHz: uint32(math.Round(d.world.options.TickRate)),
Schedules: d.buildSchedules(),
}), nil
}

// buildSchedules converts the ECS scheduler dependency graphs to proto messages.
func (d *debugModule) buildSchedules() []*cardinalv1.SystemSchedule {
ecsSchedules := d.world.world.Schedules()
schedules := make([]*cardinalv1.SystemSchedule, 0, len(ecsSchedules))
for _, s := range ecsSchedules {
if len(s.Systems) == 0 {
continue
}
nodes := make([]*cardinalv1.SystemNode, len(s.Systems))
for i, sys := range s.Systems {
depsOn := make([]uint32, len(sys.DependsOn))
for j, dep := range sys.DependsOn {
depsOn[j] = uint32(dep) //nolint:gosec // bounded by system count
}
nodes[i] = &cardinalv1.SystemNode{
Id: uint32(sys.ID), //nolint:gosec // bounded by system count
Name: sys.Name,
DependsOn: depsOn,
}
}
schedules = append(schedules, &cardinalv1.SystemSchedule{
Hook: ecsHookToProto(uint8(s.Hook)),
Systems: nodes,
})
}
return schedules
}

// -------------------------------------------------------------------------------------------------
// Performance
// -------------------------------------------------------------------------------------------------

func ecsHookToProto(hook uint8) cardinalv1.SystemHook {
mapping := [4]cardinalv1.SystemHook{
cardinalv1.SystemHook_SYSTEM_HOOK_PRE_UPDATE,
cardinalv1.SystemHook_SYSTEM_HOOK_UPDATE,
cardinalv1.SystemHook_SYSTEM_HOOK_POST_UPDATE,
cardinalv1.SystemHook_SYSTEM_HOOK_INIT,
}
if int(hook) < len(mapping) {
return mapping[hook]
}
return cardinalv1.SystemHook_SYSTEM_HOOK_UNSPECIFIED
}

// StreamPerf streams batches of per-tick timing data to connected clients.
func (d *debugModule) StreamPerf(
ctx context.Context,
_ *connect.Request[cardinalv1.StreamPerfRequest],
stream *connect.ServerStream[cardinalv1.PerfBatch],
) error {
ch := d.perf.Subscribe()
defer d.perf.Unsubscribe(ch)

for {
select {
case batch := <-ch:
proto := batchToProto(batch)
if err := stream.Send(proto); err != nil {
return err
}
case <-ctx.Done():
return ctx.Err()
}
}
}

func batchToProto(b performance.Batch) *cardinalv1.PerfBatch {
ticks := make([]*cardinalv1.TickTimeline, 0, len(b.Ticks))
for _, ts := range b.Ticks {
spans := make([]*cardinalv1.SystemSpan, 0, len(ts.Spans))
for _, span := range ts.Spans {
startOffset := span.StartTime.Sub(ts.TickStart).Nanoseconds()
duration := span.EndTime.Sub(span.StartTime).Nanoseconds()
if startOffset < 0 {
startOffset = 0
}
if duration < 0 {
duration = 0
}
spans = append(spans, &cardinalv1.SystemSpan{
SystemHook: ecsHookToProto(span.SystemHook),
System: span.SystemName,
StartOffsetNs: uint64(startOffset), //nolint:gosec // clamped to >= 0
DurationNs: uint64(duration), //nolint:gosec // clamped to >= 0
})
}
ticks = append(ticks, &cardinalv1.TickTimeline{
TickHeight: ts.TickHeight,
TickStart: timestamppb.New(ts.TickStart),
Spans: spans,
})
}
return &cardinalv1.PerfBatch{
DroppedSpans: b.DroppedSpans,
DroppedBatches: b.DroppedBatches,
Ticks: ticks,
}
}

// recordTick records a completed tick. Nil-safe.
func (d *debugModule) recordTick(tickHeight uint64, tickStart time.Time) {
if d == nil {
return
}
d.perf.RecordTick(tickHeight, tickStart)
}

// startPerfTick initializes span storage for a new tick. Nil-safe.
func (d *debugModule) startPerfTick() {
if d == nil {
return
}
d.perf.StartTick()
}

// resetPerf clears all buffered performance data. Nil-safe.
func (d *debugModule) resetPerf() {
if d == nil {
return
}
d.perf.Reset()
}

// recordSpan records a per-system span. Nil-safe.
func (d *debugModule) recordSpan(span performance.TickSpan) {
if d == nil {
return
}
d.perf.RecordSpan(span)
}

// buildTypeSchemas converts the internal schema cache to proto TypeSchema messages.
func (d *debugModule) buildTypeSchemas(cache map[string]*structpb.Struct) []*cardinalv1.TypeSchema {
schemas := make([]*cardinalv1.TypeSchema, 0, len(cache))
Expand Down
49 changes: 48 additions & 1 deletion pkg/cardinal/internal/ecs/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package ecs
import (
"sync"
"sync/atomic"
"time"

"slices"

Expand All @@ -20,6 +21,7 @@ type systemMetadata struct {
// It orders systems based on their component and system event dependencies and is optimized to
// maximize parallelism while maintaining correct order.
type systemScheduler struct {
systemHook SystemHook // The execution hook ("pre", "update", "post")
systems []systemMetadata // The systems to run
tier0 []int // The first execution tier
graph map[int][]int // Mapping of systems -> systems that depend on it
Expand All @@ -28,6 +30,10 @@ type systemScheduler struct {
// for each system. They alternate between runs to avoid reinitialization.
indegree0 []atomic.Int32
indegree1 []atomic.Int32
// onSystemRun is an optional callback for debug performance metrics.
// When set, it is called after each system execution with the system name, system hook,
// and the wall-clock start/end times. When nil, system execution has zero instrumentation overhead.
onSystemRun func(name string, hook SystemHook, startTime, endTime time.Time)
}

// newSystemScheduler creates a new system scheduler.
Expand Down Expand Up @@ -68,7 +74,14 @@ func (s *systemScheduler) Run() {
for range s.systems {
systemID := <-executionQueue
wg.Go(func() {
s.systems[systemID].fn()
if s.onSystemRun != nil { // Set by the debug module to collect performance metrics.
startTime := time.Now()
s.systems[systemID].fn()
endTime := time.Now()
s.onSystemRun(s.systems[systemID].name, s.systemHook, startTime, endTime)
} else {
s.systems[systemID].fn()
}

// Process all systems that depend on this one.
for _, dependent := range s.graph[systemID] {
Expand Down Expand Up @@ -99,6 +112,40 @@ func (s *systemScheduler) getCurrentAndNextIndegrees() ([]atomic.Int32, []atomic
return s.indegree1, s.indegree0
}

// SystemInfo describes a system and its dependencies for external introspection.
type SystemInfo struct {
ID int
Name string
DependsOn []int // IDs of systems that must complete before this one.
}

// ScheduleInfo describes the dependency graph for one execution phase.
type ScheduleInfo struct {
Hook SystemHook
Systems []SystemInfo
}

// scheduleInfo returns the dependency graph for introspection.
func (s *systemScheduler) scheduleInfo() ScheduleInfo {
// Invert the forward graph (system -> dependents) to get (system -> dependencies).
dependsOn := make(map[int][]int, len(s.systems))
for src, dsts := range s.graph {
for _, dst := range dsts {
dependsOn[dst] = append(dependsOn[dst], src)
}
}

systems := make([]SystemInfo, len(s.systems))
for i, sys := range s.systems {
systems[i] = SystemInfo{
ID: i,
Name: sys.name,
DependsOn: dependsOn[i],
}
}
return ScheduleInfo{Hook: s.systemHook, Systems: systems}
}

// createSchedule initializes the dependency graph and execution schedule for the systems.
// Must be called after all systems are registered and before the first Run.
func (s *systemScheduler) createSchedule() {
Expand Down
21 changes: 21 additions & 0 deletions pkg/cardinal/internal/ecs/world.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package ecs

import (
"time"

cardinalv1 "github.com/argus-labs/world-engine/proto/gen/go/worldengine/cardinal/v1"
)

Expand All @@ -24,8 +26,10 @@ func NewWorld() *World {
systemEvents: newSystemEventManager(),
}

systemHookNames := [3]SystemHook{PreUpdate, Update, PostUpdate}
for i := range world.scheduler {
world.scheduler[i] = newSystemScheduler()
world.scheduler[i].systemHook = systemHookNames[i]
}

return world
Expand Down Expand Up @@ -63,6 +67,23 @@ func (w *World) Tick() error {
return nil
}

// SetOnSystemRun sets a callback invoked after each system execution.
// Must be called before Init.
func (w *World) OnSystemRun(fn func(name string, systemHook SystemHook, startTime, endTime time.Time)) {
for i := range w.scheduler {
w.scheduler[i].onSystemRun = fn
}
}

// Schedules returns the dependency graphs for all execution phases.
func (w *World) Schedules() []ScheduleInfo {
schedules := make([]ScheduleInfo, len(w.scheduler))
for i := range w.scheduler {
schedules[i] = w.scheduler[i].scheduleInfo()
}
return schedules
}

func (w *World) OnComponentRegister(callback func(zero Component) error) {
w.onComponentRegister = callback
}
Expand Down
Loading
Loading