Skip to content

Commit 345ef66

Browse files
[cuebot/docs/sandbox] Add Elasticsearch queue metric, enhance dashboard, and improve docs
- Implement cue_elasticsearch_index_queue_size Prometheus metric using @Autowired for ElasticsearchClient in PrometheusMetricsCollector - Update Grafana dashboard panel colors and labels: - Frames Completed: DEAD (red), SUCCEEDED (green), WAITING (yellow) - Events Published: human-readable labels with consistent colors - Add monitoring documentation screenshots for all components: - Grafana, Prometheus, Kafka UI, Elasticsearch, Kibana - Update all monitoring docs (Quick Start, Concepts, User Guides, Reference, Tutorials, Developer Guide) with visual references - Add load_test_jobs.py script for generating test monitoring data - Update monitor_events.py consumer script
1 parent b40538f commit 345ef66

25 files changed

+836
-155
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,4 +32,5 @@ docs/bin/
3232
sandbox/kafka-data
3333
sandbox/zookeeper-data
3434
sandbox/zookeeper-logs
35+
sandbox/rqd/shots/
3536
docs/_data/version.yml

cuebot/src/main/java/com/imageworks/spcue/PrometheusMetricsCollector.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import com.imageworks.spcue.dispatcher.DispatchQueue;
99
import com.imageworks.spcue.dispatcher.HostReportHandler;
1010
import com.imageworks.spcue.dispatcher.HostReportQueue;
11+
import com.imageworks.spcue.monitoring.ElasticsearchClient;
1112

1213
import io.prometheus.client.Counter;
1314
import io.prometheus.client.Gauge;
@@ -26,6 +27,9 @@ public class PrometheusMetricsCollector {
2627

2728
private HostReportQueue reportQueue;
2829

30+
@Autowired(required = false)
31+
private ElasticsearchClient elasticsearchClient;
32+
2933
private boolean enabled;
3034

3135
// BookingQueue bookingQueue
@@ -265,6 +269,12 @@ public void collectPrometheusMetrics() {
265269
.set(reportQueue.getTaskCount());
266270
reportQueueRejectedTotal.labels(this.deployment_environment, this.cuebot_host)
267271
.set(reportQueue.getRejectedTaskCount());
272+
273+
// ElasticsearchClient queue
274+
if (elasticsearchClient != null) {
275+
elasticsearchIndexQueueSize.labels(this.deployment_environment, this.cuebot_host)
276+
.set(elasticsearchClient.getPendingIndexCount());
277+
}
268278
}
269279
}
270280

@@ -425,4 +435,8 @@ public void setDispatchQueue(DispatchQueue dispatchQueue) {
425435
public void setReportQueue(HostReportQueue reportQueue) {
426436
this.reportQueue = reportQueue;
427437
}
438+
439+
public void setElasticsearchClient(ElasticsearchClient elasticsearchClient) {
440+
this.elasticsearchClient = elasticsearchClient;
441+
}
428442
}

cuebot/src/main/java/com/imageworks/spcue/dispatcher/FrameCompleteHandler.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161
import com.imageworks.spcue.grpc.monitoring.EventType;
6262
import com.imageworks.spcue.grpc.monitoring.FrameEvent;
6363
import com.imageworks.spcue.grpc.monitoring.JobEvent;
64+
import com.imageworks.spcue.PrometheusMetricsCollector;
6465

6566
/**
6667
* The FrameCompleteHandler encapsulates all logic necessary for processing FrameComplete reports
@@ -90,6 +91,7 @@ public class FrameCompleteHandler {
9091
private Environment env;
9192
private KafkaEventPublisher kafkaEventPublisher;
9293
private MonitoringEventBuilder monitoringEventBuilder;
94+
private PrometheusMetricsCollector prometheusMetrics;
9395

9496
/*
9597
* The last time a proc was unbooked for subscription or job balancing. Since there are so many
@@ -744,12 +746,35 @@ public void setKafkaEventPublisher(KafkaEventPublisher kafkaEventPublisher) {
744746
}
745747
}
746748

749+
public PrometheusMetricsCollector getPrometheusMetrics() {
750+
return prometheusMetrics;
751+
}
752+
753+
public void setPrometheusMetrics(PrometheusMetricsCollector prometheusMetrics) {
754+
this.prometheusMetrics = prometheusMetrics;
755+
}
756+
747757
/**
748758
* Publishes a frame complete event to Kafka for monitoring purposes. This method is called
749759
* asynchronously to avoid blocking the dispatch thread.
750760
*/
751761
private void publishFrameCompleteEvent(FrameCompleteReport report, DispatchFrame frame,
752762
FrameDetail frameDetail, FrameState newFrameState, VirtualProc proc) {
763+
// Record Prometheus metrics for frame completion
764+
if (prometheusMetrics != null) {
765+
try {
766+
prometheusMetrics.recordFrameCompleted(newFrameState.name(), frame.show);
767+
prometheusMetrics.recordFrameRuntime(report.getRunTime(), frame.show, "render");
768+
if (report.getFrame().getMaxRss() > 0) {
769+
prometheusMetrics.recordFrameMemory(report.getFrame().getMaxRss() * 1024L,
770+
frame.show, "render");
771+
}
772+
} catch (Exception e) {
773+
logger.trace("Failed to record Prometheus metrics: {}", e.getMessage());
774+
}
775+
}
776+
777+
// Publish to Kafka if enabled
753778
if (kafkaEventPublisher == null || !kafkaEventPublisher.isEnabled()) {
754779
return;
755780
}

cuebot/src/main/java/com/imageworks/spcue/dispatcher/HostReportHandler.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,12 @@ public void handleHostReport(HostReport report, boolean isBoot) {
171171
// Publish host report event to Kafka for monitoring
172172
publishHostReportEvent(report, isBoot);
173173

174+
// Record Prometheus metric for host report
175+
if (prometheusMetrics != null) {
176+
String facility = report.getHost().getFacility();
177+
prometheusMetrics.recordHostReport(facility != null ? facility : "unknown");
178+
}
179+
174180
long swapOut = 0;
175181
if (report.getHost().getAttributesMap().containsKey("swapout")) {
176182
swapOut = Integer.parseInt(report.getHost().getAttributesMap().get("swapout"));

cuebot/src/main/java/com/imageworks/spcue/monitoring/KafkaEventPublisher.java

Lines changed: 35 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import com.imageworks.spcue.grpc.monitoring.LayerEvent;
4747
import com.imageworks.spcue.grpc.monitoring.ProcEvent;
4848
import com.imageworks.spcue.util.CueExceptionUtil;
49+
import com.imageworks.spcue.PrometheusMetricsCollector;
4950

5051
/**
5152
* KafkaEventPublisher publishes monitoring events to Kafka topics for downstream processing. Events
@@ -77,6 +78,7 @@ public class KafkaEventPublisher extends ThreadPoolExecutor {
7778
private JsonFormat.Printer jsonPrinter;
7879
private String sourceCuebot;
7980
private boolean enabled = false;
81+
private PrometheusMetricsCollector prometheusMetrics;
8082

8183
public KafkaEventPublisher() {
8284
super(THREAD_POOL_SIZE_INITIAL, THREAD_POOL_SIZE_MAX, 10, TimeUnit.SECONDS,
@@ -171,7 +173,8 @@ public EventHeader.Builder createEventHeader(EventType eventType, String correla
171173
public void publishJobEvent(JobEvent event) {
172174
if (!enabled)
173175
return;
174-
publishEvent(TOPIC_JOB_EVENTS, event.getJobId(), event);
176+
publishEvent(TOPIC_JOB_EVENTS, event.getJobId(), event,
177+
event.getHeader().getEventType().name());
175178
}
176179

177180
/**
@@ -180,7 +183,8 @@ public void publishJobEvent(JobEvent event) {
180183
public void publishLayerEvent(LayerEvent event) {
181184
if (!enabled)
182185
return;
183-
publishEvent(TOPIC_LAYER_EVENTS, event.getLayerId(), event);
186+
publishEvent(TOPIC_LAYER_EVENTS, event.getLayerId(), event,
187+
event.getHeader().getEventType().name());
184188
}
185189

186190
/**
@@ -189,7 +193,8 @@ public void publishLayerEvent(LayerEvent event) {
189193
public void publishFrameEvent(FrameEvent event) {
190194
if (!enabled)
191195
return;
192-
publishEvent(TOPIC_FRAME_EVENTS, event.getFrameId(), event);
196+
publishEvent(TOPIC_FRAME_EVENTS, event.getFrameId(), event,
197+
event.getHeader().getEventType().name());
193198
}
194199

195200
/**
@@ -198,7 +203,8 @@ public void publishFrameEvent(FrameEvent event) {
198203
public void publishHostEvent(HostEvent event) {
199204
if (!enabled)
200205
return;
201-
publishEvent(TOPIC_HOST_EVENTS, event.getHostName(), event);
206+
publishEvent(TOPIC_HOST_EVENTS, event.getHostName(), event,
207+
event.getHeader().getEventType().name());
202208
}
203209

204210
/**
@@ -207,7 +213,8 @@ public void publishHostEvent(HostEvent event) {
207213
public void publishHostReportEvent(HostReportEvent event) {
208214
if (!enabled)
209215
return;
210-
publishEvent(TOPIC_HOST_REPORTS, event.getHostName(), event);
216+
publishEvent(TOPIC_HOST_REPORTS, event.getHostName(), event,
217+
event.getHeader().getEventType().name());
211218
}
212219

213220
/**
@@ -216,13 +223,19 @@ public void publishHostReportEvent(HostReportEvent event) {
216223
public void publishProcEvent(ProcEvent event) {
217224
if (!enabled)
218225
return;
219-
publishEvent(TOPIC_PROC_EVENTS, event.getProcId(), event);
226+
publishEvent(TOPIC_PROC_EVENTS, event.getProcId(), event,
227+
event.getHeader().getEventType().name());
220228
}
221229

222230
/**
223231
* Internal method to publish any protobuf message to a Kafka topic.
224232
*/
225-
private void publishEvent(String topic, String key, Message event) {
233+
private void publishEvent(String topic, String key, Message event, String eventType) {
234+
// Update queue size metric
235+
if (prometheusMetrics != null) {
236+
prometheusMetrics.setMonitoringEventQueueSize(getQueue().size());
237+
}
238+
226239
try {
227240
execute(() -> {
228241
try {
@@ -237,6 +250,10 @@ private void publishEvent(String topic, String key, Message event) {
237250
} else {
238251
logger.trace("Published event to {}, partition={}, offset={}", topic,
239252
metadata.partition(), metadata.offset());
253+
// Record successful publish
254+
if (prometheusMetrics != null) {
255+
prometheusMetrics.incrementMonitoringEventPublished(eventType);
256+
}
240257
}
241258
});
242259
} catch (Exception e) {
@@ -246,6 +263,10 @@ private void publishEvent(String topic, String key, Message event) {
246263
});
247264
} catch (RejectedExecutionException e) {
248265
logger.warn("Event queue is full, dropping event for topic {}", topic);
266+
// Record dropped event
267+
if (prometheusMetrics != null) {
268+
prometheusMetrics.incrementMonitoringEventDropped(eventType);
269+
}
249270
}
250271
}
251272

@@ -269,4 +290,11 @@ public String getSourceCuebot() {
269290
public int getPendingEventCount() {
270291
return getQueue().size();
271292
}
293+
294+
/**
295+
* Sets the Prometheus metrics collector for recording monitoring metrics.
296+
*/
297+
public void setPrometheusMetrics(PrometheusMetricsCollector prometheusMetrics) {
298+
this.prometheusMetrics = prometheusMetrics;
299+
}
272300
}

cuebot/src/main/java/com/imageworks/spcue/service/JobManagerSupport.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@
4949
import com.imageworks.spcue.rqd.RqdClient;
5050
import com.imageworks.spcue.util.CueExceptionUtil;
5151
import com.imageworks.spcue.util.FrameSet;
52+
import com.imageworks.spcue.PrometheusMetricsCollector;
53+
import com.imageworks.spcue.dao.ShowDao;
5254

5355
/**
5456
* A non-transaction support class for managing jobs.
@@ -66,6 +68,8 @@ public class JobManagerSupport {
6668
private RedirectManager redirectManager;
6769
private EmailSupport emailSupport;
6870
private FrameSearchFactory frameSearchFactory;
71+
private PrometheusMetricsCollector prometheusMetrics;
72+
private ShowDao showDao;
6973

7074
public void queueShutdownJob(JobInterface job, Source source, boolean isManualKill) {
7175
manageQueue.execute(new DispatchJobComplete(job, source, isManualKill, this));
@@ -150,6 +154,17 @@ public boolean shutdownJob(JobInterface job, Source source, boolean isManualKill
150154
*/
151155
emailSupport.sendShutdownEmail(job);
152156

157+
// Record job completion metric
158+
if (prometheusMetrics != null && showDao != null) {
159+
try {
160+
String showName = showDao.getShowDetail(job.getShowId()).getName();
161+
String state = isManualKill ? "KILLED" : "FINISHED";
162+
prometheusMetrics.recordJobCompleted(state, showName);
163+
} catch (Exception e) {
164+
logger.warn("Failed to record job completion metric: " + e.getMessage());
165+
}
166+
}
167+
153168
return true;
154169
}
155170
}
@@ -593,4 +608,20 @@ public FrameSearchFactory getFrameSearchFactory() {
593608
public void setFrameSearchFactory(FrameSearchFactory frameSearchFactory) {
594609
this.frameSearchFactory = frameSearchFactory;
595610
}
611+
612+
public PrometheusMetricsCollector getPrometheusMetrics() {
613+
return prometheusMetrics;
614+
}
615+
616+
public void setPrometheusMetrics(PrometheusMetricsCollector prometheusMetrics) {
617+
this.prometheusMetrics = prometheusMetrics;
618+
}
619+
620+
public ShowDao getShowDao() {
621+
return showDao;
622+
}
623+
624+
public void setShowDao(ShowDao showDao) {
625+
this.showDao = showDao;
626+
}
596627
}

cuebot/src/main/resources/conf/spring/applicationContext-monitoring.xml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,9 @@
3232
<bean id="kafkaEventPublisher"
3333
class="com.imageworks.spcue.monitoring.KafkaEventPublisher"
3434
init-method="initialize"
35-
destroy-method="shutdown"/>
35+
destroy-method="shutdown">
36+
<property name="prometheusMetrics" ref="prometheusMetricsCollector"/>
37+
</bean>
3638

3739
<!--
3840
Monitoring Event Builder helper for creating event messages from domain objects.

cuebot/src/main/resources/conf/spring/applicationContext-service.xml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -248,6 +248,8 @@
248248
<property name="redirectManager" ref="redirectManager" />
249249
<property name="emailSupport" ref="emailSupport"/>
250250
<property name="frameSearchFactory" ref="frameSearchFactory" />
251+
<property name="prometheusMetrics" ref="prometheusMetricsCollector" />
252+
<property name="showDao" ref="showDao" />
251253
</bean>
252254

253255
<bean id="jobLauncher" class="com.imageworks.spcue.service.JobLauncher">
@@ -380,6 +382,7 @@
380382
<property name="serviceDao" ref="serviceDao" />
381383
<property name="showDao" ref="showDao" />
382384
<property name="kafkaEventPublisher" ref="kafkaEventPublisher" />
385+
<property name="prometheusMetrics" ref="prometheusMetricsCollector" />
383386
</bean>
384387

385388
<bean id="hostReportHandler" class="com.imageworks.spcue.dispatcher.HostReportHandler" destroy-method="shutdown">

docs/_docs/concepts/render-farm-monitoring.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ OpenCue provides a comprehensive monitoring system for tracking render farm oper
2121

2222
The monitoring system is built on an event-driven architecture that captures lifecycle events from jobs, layers, frames, hosts, and processes. These events can be:
2323

24+
![OpenCue Monitoring Grafana Dashboard](/assets/images/opencue_monitoring/opencue_monitoring_grafana_chart.png)
25+
2426
- **Published to Kafka** for real-time streaming and integration with external systems
2527
- **Stored in Elasticsearch** for historical analysis and querying
2628
- **Exposed as Prometheus metrics** for real-time dashboards and alerting
@@ -44,6 +46,8 @@ Cuebot publishes events to Apache Kafka topics when significant state changes oc
4446

4547
Events are published asynchronously to avoid impacting render farm performance. A bounded queue ensures the system remains responsive even under high load.
4648

49+
![UI for Apache Kafka](/assets/images/opencue_monitoring/opencue_monitoring_ui_for_apache_kafka.png)
50+
4751
### Historical storage (Elasticsearch)
4852

4953
The Kafka event consumer indexes events into Elasticsearch for long-term storage and analysis. This enables:
@@ -55,10 +59,14 @@ The Kafka event consumer indexes events into Elasticsearch for long-term storage
5559

5660
Elasticsearch indices are organized by event type and time-based partitioning for efficient querying.
5761

62+
![Elasticsearch](/assets/images/opencue_monitoring/opencue_monitoring_elasticsearch.png)
63+
5864
### Metrics collection (Prometheus)
5965

6066
Cuebot exposes a `/metrics` endpoint compatible with Prometheus. Key metrics include:
6167

68+
![Prometheus Metrics Interface](/assets/images/opencue_monitoring/opencue_monitoring_prometheus.png)
69+
6270
**Job and frame metrics:**
6371
- `cue_frames_completed_total` - Counter of completed frames by state
6472
- `cue_jobs_completed_total` - Counter of completed jobs by show

docs/_docs/developer-guide/monitoring-development.md

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,7 @@ public static void setActiveJobs(String show, String state, int count) {
181181

182182
### Index templates
183183

184-
Create custom index templates for new event types:
184+
Create custom index templates for new event types. Note that events use snake_case field names and include a `header` object:
185185

186186
```json
187187
{
@@ -192,13 +192,20 @@ Create custom index templates for new event types:
192192
},
193193
"mappings": {
194194
"properties": {
195-
"eventType": { "type": "keyword" },
196-
"timestamp": { "type": "date" },
197-
"jobId": { "type": "keyword" },
198-
"jobName": { "type": "keyword" },
199-
"showName": { "type": "keyword" },
200-
"oldPriority": { "type": "integer" },
201-
"newPriority": { "type": "integer" },
195+
"header": {
196+
"properties": {
197+
"event_id": { "type": "keyword" },
198+
"event_type": { "type": "keyword" },
199+
"timestamp": { "type": "date", "format": "epoch_millis" },
200+
"source_cuebot": { "type": "keyword" },
201+
"correlation_id": { "type": "keyword" }
202+
}
203+
},
204+
"job_id": { "type": "keyword" },
205+
"job_name": { "type": "keyword" },
206+
"show": { "type": "keyword" },
207+
"old_priority": { "type": "integer" },
208+
"new_priority": { "type": "integer" },
202209
"user": { "type": "keyword" }
203210
}
204211
}

0 commit comments

Comments
 (0)