Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
cb8a9a2
[cuebot/pycue/proto] Add OpenCue render farm monitoring system
ramonfigueiredo Nov 25, 2025
a51494a
[cuebot/sandbox] Add monitoring stack infrastructure and fix ElasticS…
ramonfigueiredo Nov 25, 2025
385561a
[docs] Add documentation for render farm monitoring system
ramonfigueiredo Nov 25, 2025
b40538f
[docs] Add render farm monitoring documentation and update nav ordering
ramonfigueiredo Nov 25, 2025
3285e41
[cuebot/docs/sandbox] Add Elasticsearch queue metric, enhance dashboa…
ramonfigueiredo Nov 26, 2025
0ed2b0f
[docs] Fix monitoring architecture diagram and remove non-existent Mo…
ramonfigueiredo Nov 26, 2025
5f763e6
[cuebot/sandbox] Remove redundant Kafka monitoring metrics from Prome…
ramonfigueiredo Nov 27, 2025
06fee7c
[cuebot/sandbox] Remove Elasticsearch queue metric from Prometheus
ramonfigueiredo Nov 27, 2025
6f17ca7
[sandbox] Add command-line parameters to load_test_jobs.py
ramonfigueiredo Nov 27, 2025
e0ec762
[cuebot] Add shot label to frame and job completion metrics
ramonfigueiredo Nov 28, 2025
1c68a97
[cuebot/sandbox] Add job core seconds histogram metric
ramonfigueiredo Nov 28, 2025
440ffa7
[cuebot/sandbox] Change frame histograms to layer-level max histograms
ramonfigueiredo Nov 28, 2025
f8c7ea9
[cuebot] Add Elasticsearch queries and complete event publishing for …
ramonfigueiredo Nov 28, 2025
b3cd2cf
[cuebot] Refactor Kafka event publisher setters for null safety
ramonfigueiredo Nov 28, 2025
3ea47e2
[cuebot] Remove try-catch blocks from monitoring code in FrameComplet…
ramonfigueiredo Nov 28, 2025
ae7e5b1
[cuebot] Refactor MonitoringEventBuilder to Spring-managed bean
ramonfigueiredo Nov 29, 2025
0a582c3
[cuebot/sandbox] Add pickup time tracking for frame dispatch metrics
ramonfigueiredo Nov 29, 2025
ba11dea
[sandbox] Reference guide for exploring OpenCue monitoring data in El…
ramonfigueiredo Nov 29, 2025
3c0686e
[cuebot] Fix exception handling in job completion metrics
ramonfigueiredo Nov 29, 2025
b8792e1
[cuebot/proto] Fix Elasticsearch timestamp mapping and use proto comp…
ramonfigueiredo Nov 29, 2025
2f26a91
[pycue] Make monitoring imports unconditional
ramonfigueiredo Nov 29, 2025
b690319
[monitoring] Remove HostReportEvent from Kafka/Elasticsearch pipeline
ramonfigueiredo Nov 29, 2025
b03689f
[cuebot] Add Kafka AdminClient for topic administration
ramonfigueiredo Nov 30, 2025
8f39cc3
[cuebot/rust/sandbox] Decouple Kafka consumer into standalone Rust in…
ramonfigueiredo Nov 30, 2025
56e7416
[docs] Update monitoring documentation
ramonfigueiredo Nov 30, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,5 @@ docs/bin/
sandbox/kafka-data
sandbox/zookeeper-data
sandbox/zookeeper-logs
sandbox/rqd/shots/
docs/_data/version.yml
2 changes: 1 addition & 1 deletion VERSION.in
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.14
1.15
17 changes: 15 additions & 2 deletions cuebot/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ repositories {
def grpcVersion = '1.47.0'
def protobufVersion = '3.21.2'
def activemqVersion = '5.12.0'
def kafkaVersion = '3.4.0'
def elasticsearchVersion = '8.8.0'

// Spring dependency versions are managed by the io.spring.dependency-management plugin.
// Appropriate versions will be pulled based on the spring boot version specified in the
Expand All @@ -52,6 +54,15 @@ dependencies {
implementation group: 'io.prometheus', name: 'simpleclient', version: '0.16.0'
implementation group: 'io.prometheus', name: 'simpleclient_servlet', version: '0.16.0'

// Kafka for event publishing
implementation group: 'org.apache.kafka', name: 'kafka-clients', version: "${kafkaVersion}"
implementation group: 'org.springframework.kafka', name: 'spring-kafka', version: '2.9.0'

// Elasticsearch for historical data storage
implementation group: 'co.elastic.clients', name: 'elasticsearch-java', version: "${elasticsearchVersion}"
implementation group: 'org.elasticsearch.client', name: 'elasticsearch-rest-client', version: "${elasticsearchVersion}"
implementation group: 'jakarta.json', name: 'jakarta.json-api', version: '2.1.1'

protobuf files("../proto/src/")

testImplementation group: 'junit', name: 'junit', version: '4.12'
Expand All @@ -67,12 +78,14 @@ dependencies {

compileJava {
dependsOn generateProto
options.compilerArgs << "-Xlint:all" << "-Werror"
// Exclude serial warning due to protobuf-generated code warnings
options.compilerArgs << "-Xlint:all,-serial" << "-Werror"
}

compileTestJava {
dependsOn generateProto
options.compilerArgs << "-Xlint:all" << "-Werror"
// Exclude serial warning due to protobuf-generated code warnings
options.compilerArgs << "-Xlint:all,-serial" << "-Werror"
}

protobuf {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@
import org.apache.logging.log4j.LogManager;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;

@SpringBootApplication
@SpringBootApplication(exclude = {KafkaAutoConfiguration.class})
public class CuebotApplication extends SpringApplication {
private static String[] checkArgs(String[] args) {
Optional<String> deprecatedFlag = Arrays.stream(args)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,15 @@ public class ExecutionSummary {
public long gpuTimeSuccess;
public long gpuTimeFail;
public long highMemoryKb;
public int highFrameSec;

public int getHighFrameSec() {
return highFrameSec;
}

public void setHighFrameSec(int highFrameSec) {
this.highFrameSec = highFrameSec;
}

public long getHighMemoryKb() {
return highMemoryKb;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,38 @@ public class PrometheusMetricsCollector {
.labelNames("env", "cuebot_host", "render_node", "job_name", "frame_name", "frame_id")
.register();

private static final Counter frameCompletedCounter = Counter.build()
.name("cue_frames_completed_total").help("Total number of frames completed")
.labelNames("env", "cuebot_host", "state", "show", "shot").register();

private static final Counter jobCompletedCounter =
Counter.build().name("cue_jobs_completed_total").help("Total number of jobs completed")
.labelNames("env", "cuebot_host", "state", "show", "shot").register();

private static final Histogram jobCoreSecondsHistogram = Histogram.build()
.name("cue_job_core_seconds").help("Histogram of total core seconds per job")
.labelNames("env", "cuebot_host", "show", "shot")
.buckets(3600, 36000, 360000, 3600000, 36000000).register();

private static final Histogram layerMaxRuntimeHistogram =
Histogram.build().name("cue_layer_max_runtime_seconds")
.help("Histogram of max frame runtime per layer in seconds")
.labelNames("env", "cuebot_host", "show", "shot", "layer_type")
.buckets(60, 300, 600, 1800, 3600, 7200, 14400, 28800, 86400).register();

private static final Histogram layerMaxMemoryHistogram =
Histogram.build().name("cue_layer_max_memory_bytes")
.help("Histogram of max frame memory usage per layer in bytes")
.labelNames("env", "cuebot_host", "show", "shot", "layer_type")
.buckets(256L * 1024 * 1024, 512L * 1024 * 1024, 1024L * 1024 * 1024,
2048L * 1024 * 1024, 4096L * 1024 * 1024, 8192L * 1024 * 1024,
16384L * 1024 * 1024, 32768L * 1024 * 1024)
.register();

private static final Counter hostReportsReceivedCounter = Counter.build()
.name("cue_host_reports_received_total").help("Total number of host reports received")
.labelNames("env", "cuebot_host", "facility").register();

private String deployment_environment;
private String cuebot_host;

Expand Down Expand Up @@ -269,6 +301,82 @@ public void incrementFrameKillFailureCounter(String hostname, String jobName, St
jobName, frameName, frameId).inc();
}

/**
* Record a frame completion
*
* @param state final state of the frame
* @param show show name
* @param shot shot name
*/
public void recordFrameCompleted(String state, String show, String shot) {
frameCompletedCounter
.labels(this.deployment_environment, this.cuebot_host, state, show, shot).inc();
}

/**
* Record a job completion
*
* @param state final state of the job
* @param show show name
* @param shot shot name
*/
public void recordJobCompleted(String state, String show, String shot) {
jobCompletedCounter.labels(this.deployment_environment, this.cuebot_host, state, show, shot)
.inc();
}

/**
* Record job total core seconds for histogramming
*
* @param coreSeconds total core seconds consumed by the job
* @param show show name
* @param shot shot name
*/
public void recordJobCoreSeconds(double coreSeconds, String show, String shot) {
jobCoreSecondsHistogram.labels(this.deployment_environment, this.cuebot_host, show, shot)
.observe(coreSeconds);
}

/**
* Record layer max runtime for histogramming
*
* @param runtimeSeconds max runtime in seconds for the layer
* @param show show name
* @param shot shot name
* @param layerType layer type
*/
public void recordLayerMaxRuntime(double runtimeSeconds, String show, String shot,
String layerType) {
layerMaxRuntimeHistogram
.labels(this.deployment_environment, this.cuebot_host, show, shot, layerType)
.observe(runtimeSeconds);
}

/**
* Record layer max memory usage for histogramming
*
* @param memoryBytes max memory in bytes for the layer
* @param show show name
* @param shot shot name
* @param layerType layer type
*/
public void recordLayerMaxMemory(double memoryBytes, String show, String shot,
String layerType) {
layerMaxMemoryHistogram
.labels(this.deployment_environment, this.cuebot_host, show, shot, layerType)
.observe(memoryBytes);
}

/**
* Record a host report received
*
* @param facility facility name
*/
public void recordHostReport(String facility) {
hostReportsReceivedCounter.labels(this.deployment_environment, this.cuebot_host, facility)
.inc();
}

// Setters used for dependency injection
public void setBookingQueue(BookingQueue bookingQueue) {
this.bookingQueue = bookingQueue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@
"classpath:conf/spring/applicationContext-grpcServer.xml",
"classpath:conf/spring/applicationContext-service.xml",
"classpath:conf/spring/applicationContext-jms.xml",
"classpath:conf/spring/applicationContext-criteria.xml"})
"classpath:conf/spring/applicationContext-criteria.xml",
"classpath:conf/spring/applicationContext-monitoring.xml"})
@EnableConfigurationProperties
@PropertySource({"classpath:opencue.properties"})
public class AppConfig {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,14 @@ public interface DependDao {
*/
boolean decrementDependCount(FrameInterface f);

/**
* Check if a frame is dispatchable (has depend_count = 0).
*
* @param f the frame to check
* @return true if the frame's depend_count is 0
*/
boolean isFrameDispatchable(FrameInterface f);

/**
* Returns true if this is the thread that set the depend to inactive.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,15 @@ public boolean decrementDependCount(FrameInterface f) {
return getJdbcTemplate().update(DECREMENT_DEPEND_COUNT, f.getFrameId()) == 1;
}

private static final String IS_FRAME_DISPATCHABLE =
"SELECT int_depend_count = 0 FROM frame WHERE pk_frame = ?";

@Override
public boolean isFrameDispatchable(FrameInterface f) {
return Boolean.TRUE.equals(getJdbcTemplate().queryForObject(IS_FRAME_DISPATCHABLE,
Boolean.class, f.getFrameId()));
}

private static final String[] DELETE_DEPEND =
{"DELETE FROM depend WHERE pk_parent=?", "DELETE FROM depend WHERE pk_depend=?"};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -409,12 +409,13 @@ public FrameStateTotals mapRow(ResultSet rs, int rowNum) throws SQLException {
}, layer.getLayerId());
}

private static final String GET_EXECUTION_SUMMARY = "SELECT "
+ "layer_usage.int_core_time_success," + "layer_usage.int_core_time_fail,"
+ "layer_usage.int_gpu_time_success," + "layer_usage.int_gpu_time_fail,"
+ "layer_usage.int_clock_time_success," + "layer_mem.int_max_rss " + "FROM " + "layer,"
+ "layer_usage, " + "layer_mem " + "WHERE " + "layer.pk_layer = layer_usage.pk_layer "
+ "AND " + "layer.pk_layer = layer_mem.pk_layer " + "AND " + "layer.pk_layer = ?";
private static final String GET_EXECUTION_SUMMARY =
"SELECT " + "layer_usage.int_core_time_success," + "layer_usage.int_core_time_fail,"
+ "layer_usage.int_gpu_time_success," + "layer_usage.int_gpu_time_fail,"
+ "layer_usage.int_clock_time_success," + "layer_usage.int_clock_time_high,"
+ "layer_mem.int_max_rss " + "FROM " + "layer," + "layer_usage, " + "layer_mem "
+ "WHERE " + "layer.pk_layer = layer_usage.pk_layer " + "AND "
+ "layer.pk_layer = layer_mem.pk_layer " + "AND " + "layer.pk_layer = ?";

@Override
public ExecutionSummary getExecutionSummary(LayerInterface layer) {
Expand All @@ -429,6 +430,7 @@ public ExecutionSummary mapRow(ResultSet rs, int rowNum) throws SQLException {
e.gpuTimeFail = rs.getLong("int_gpu_time_fail");
e.gpuTime = e.gpuTimeSuccess + e.gpuTimeFail;
e.highMemoryKb = rs.getLong("int_max_rss");
e.highFrameSec = rs.getInt("int_clock_time_high");
return e;
}
}, layer.getLayerId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,12 @@
import com.imageworks.spcue.grpc.host.ThreadMode;
import com.imageworks.spcue.grpc.job.CheckpointState;
import com.imageworks.spcue.grpc.job.FrameState;
import com.imageworks.spcue.grpc.monitoring.EventType;
import com.imageworks.spcue.grpc.monitoring.FrameEvent;
import com.imageworks.spcue.grpc.monitoring.ProcEvent;
import com.imageworks.spcue.grpc.rqd.RunFrame;
import com.imageworks.spcue.monitoring.KafkaEventPublisher;
import com.imageworks.spcue.monitoring.MonitoringEventBuilder;
import com.imageworks.spcue.rqd.RqdClient;
import com.imageworks.spcue.service.BookingManager;
import com.imageworks.spcue.service.DependManager;
Expand All @@ -77,6 +82,8 @@ public class DispatchSupportService implements DispatchSupport {
private RedirectManager redirectManager;
private BookingManager bookingManager;
private BookingDao bookingDao;
private KafkaEventPublisher kafkaEventPublisher;
private MonitoringEventBuilder monitoringEventBuilder;

private ConcurrentHashMap<String, StrandedCores> strandedCores =
new ConcurrentHashMap<String, StrandedCores>();
Expand Down Expand Up @@ -216,9 +223,15 @@ public void runFrame(VirtualProc proc, DispatchFrame frame) {
public void startFrameAndProc(VirtualProc proc, DispatchFrame frame) {
logger.trace("starting frame: " + frame);

// Capture previous state before update for event publishing
FrameState previousState = frame.state;

frameDao.updateFrameStarted(proc, frame);

reserveProc(proc, frame);

// Publish FRAME_STARTED event (WAITING -> RUNNING transition)
publishFrameStartedEvent(frame, proc, previousState);
}

@Transactional(propagation = Propagation.REQUIRED, readOnly = true)
Expand Down Expand Up @@ -460,6 +473,7 @@ private void reserveProc(VirtualProc proc, DispatchFrame frame) {
if (proc.isNew()) {
logger.info("creating proc " + proc.getName() + " for " + frame.getName());
procDao.insertVirtualProc(proc);
publishProcEvent(EventType.PROC_BOOKED, proc);
} else {
logger.info("updated proc " + proc.getName() + " for " + frame.getName());
procDao.updateVirtualProcAssignment(proc);
Expand All @@ -481,6 +495,7 @@ public void unbookProc(VirtualProc proc, String reason) {
}
proc.unbooked = true;
procDao.deleteVirtualProc(proc);
publishProcEvent(EventType.PROC_UNBOOKED, proc);
DispatchSupport.unbookedProcs.getAndIncrement();
logger.info(proc + " " + reason);

Expand Down Expand Up @@ -680,4 +695,42 @@ public void setBookingDao(BookingDao bookingDao) {
public void clearCache() {
dispatcherDao.clearCache();
}

public KafkaEventPublisher getKafkaEventPublisher() {
return kafkaEventPublisher;
}

public void setKafkaEventPublisher(KafkaEventPublisher kafkaEventPublisher) {
this.kafkaEventPublisher = kafkaEventPublisher;
}

public void setMonitoringEventBuilder(MonitoringEventBuilder monitoringEventBuilder) {
this.monitoringEventBuilder = monitoringEventBuilder;
}

/**
* Publishes a proc event to Kafka for monitoring purposes.
*/
private void publishProcEvent(EventType eventType, VirtualProc proc) {
if (kafkaEventPublisher == null || !kafkaEventPublisher.isEnabled()) {
return;
}

ProcEvent event = monitoringEventBuilder.buildProcEvent(eventType, proc);
kafkaEventPublisher.publishProcEvent(event);
}

/**
* Publishes a frame started event to Kafka for monitoring purposes. This captures the WAITING
* -> RUNNING transition for pickup time analysis.
*/
private void publishFrameStartedEvent(DispatchFrame frame, VirtualProc proc,
FrameState previousState) {
if (kafkaEventPublisher == null || !kafkaEventPublisher.isEnabled()) {
return;
}

FrameEvent event = monitoringEventBuilder.buildFrameStartedEvent(frame, proc);
kafkaEventPublisher.publishFrameEvent(event);
}
}
Loading
Loading