@@ -19,47 +19,62 @@ This guide explains how to extend, customize, and develop against the OpenCue mo
1919
2020## Architecture overview
2121
22- The monitoring system is implemented in Cuebot and consists of :
22+ The monitoring system uses a decoupled architecture with Cuebot publishing events to Kafka and a standalone Rust-based indexer consuming events for Elasticsearch storage :
2323
2424```
2525┌────────────────────────────────────────────────────────────────────────────┐
2626│ Cuebot │
2727│ │
2828│ ┌─────────────┐ ┌─────────────────────┐ │
29- │ │ Service │────>│ KafkaEventPublisher │───────> Kafka │
30- │ │ Layer │ └─────────────────────┘ │ │
31- │ └─────────────┘ │ │ │
32- │ │ │ v │
33- │ │ v ┌───────────────────┐ │
34- │ │ ┌──────────────┐ │ KafkaEventConsumer│ │
35- │ └─────────────>│ Prometheus │ └───────────────────┘ │
36- │ │ Metrics │ │ │
37- │ └──────────────┘ v │
38- │ ┌───────────────────┐ │
39- │ │ ElasticsearchClient│ │
40- │ └───────────────────┘ │
41- │ │ │
42- └─────────────────────────────────────────────────────────│──────────────────┘
43- v
44- Elasticsearch
29+ │ │ Service │────>│ KafkaEventPublisher │──────────> Kafka │
30+ │ │ Layer │ └─────────────────────┘ │ │
31+ │ └─────────────┘ │ │ │
32+ │ │ v │ │
33+ │ └─────────────>┌──────────────┐ │ │
34+ │ │ Prometheus │ │ │
35+ │ │ Metrics │ │ │
36+ │ └──────────────┘ │ │
37+ └────────────────────────────────────────────────────────────│───────────────┘
38+ │
39+ v
40+ ┌────────────────────────────────────────────────────────────────────────────┐
41+ │ kafka-es-indexer (Rust) │
42+ │ │
43+ │ ┌───────────────────┐ ┌─────────────────────────┐ │
44+ │ │ Kafka Consumer │────────>│ Elasticsearch Client │ │
45+ │ │ (rdkafka) │ │ (bulk indexing) │ │
46+ │ └───────────────────┘ └─────────────────────────┘ │
47+ │ │ │
48+ └────────────────────────────────────────────│───────────────────────────────┘
49+ v
50+ Elasticsearch
4551```
4652
4753** Data flow:**
48541 . ** Service Layer** (e.g., FrameCompleteHandler, HostReportHandler) generates events and calls KafkaEventPublisher
49552 . ** KafkaEventPublisher** serializes events as JSON and publishes them to Kafka topics
50- 3 . ** KafkaEventConsumer ** subscribes to Kafka topics and receives published events
51- 4 . ** KafkaEventConsumer ** uses ** ElasticsearchClient ** to index events into Elasticsearch for historical storage
56+ 3 . ** kafka-es-indexer ** (standalone Rust service) consumes events from Kafka topics
57+ 4 . ** kafka-es-indexer ** bulk indexes events into Elasticsearch for historical storage
52585 . ** Prometheus Metrics** are updated directly by the Service Layer and KafkaEventPublisher (for queue metrics)
5359
54- ### Key classes
60+ ### Key components
5561
56- | Class | Location | Purpose |
57- | -------| ----------| ---------|
62+ | Component | Location | Purpose |
63+ | ----------- | ----------| ---------|
5864| ` KafkaEventPublisher ` | ` com.imageworks.spcue.monitoring ` | Publishes events to Kafka |
59- | ` KafkaEventConsumer ` | ` com.imageworks.spcue.monitoring ` | Consumes events from Kafka for ES indexing |
60- | ` ElasticsearchClient ` | ` com.imageworks.spcue.monitoring ` | Writes events to Elasticsearch |
6165| ` MonitoringEventBuilder ` | ` com.imageworks.spcue.monitoring ` | Builds event payloads |
6266| ` PrometheusMetricsCollector ` | ` com.imageworks.spcue ` | Exposes Prometheus metrics |
67+ | ` kafka-es-indexer ` | ` rust/crates/kafka-es-indexer/ ` | Consumes Kafka, indexes to Elasticsearch |
68+
69+ ### Why a separate indexer?
70+
71+ The Kafka-to-Elasticsearch indexer is implemented as a standalone Rust service rather than within Cuebot for several reasons:
72+
73+ - ** Decoupling** : Cuebot focuses on core scheduling; indexing is a separate concern
74+ - ** Scalability** : The indexer can be scaled independently from Cuebot
75+ - ** Reliability** : Kafka buffering ensures events are not lost if Elasticsearch is temporarily unavailable
76+ - ** Performance** : Rust provides efficient resource usage for high-throughput event processing
77+ - ** Operational flexibility** : The indexer can be updated, restarted, or replayed without affecting Cuebot
6378
6479## Adding new event types
6580
@@ -201,6 +216,8 @@ public static void setActiveJobs(String show, String state, int count) {
201216
202217## Customizing Elasticsearch indexing
203218
219+ The ` kafka-es-indexer ` service handles all Elasticsearch indexing. It automatically routes events to indices based on the Kafka topic name.
220+
204221### Index templates
205222
206223Create custom index templates for new event types. Note that events use snake_case field names and include a ` header ` object:
@@ -234,27 +251,18 @@ Create custom index templates for new event types. Note that events use snake_ca
234251}
235252```
236253
237- ### Custom indexing logic
254+ ### Index naming convention
238255
239- Extend ` ElasticsearchClient ` to add custom indexing :
256+ The kafka-es-indexer creates daily indices using the pattern :
240257
241- ``` java
242- // ElasticsearchClient.java
243- public void indexJobAdminEvent(MonitoringEvent event) {
244- String indexName = " opencue-job-admin-" +
245- LocalDate . now(). format(DateTimeFormatter . ISO_DATE );
246-
247- Map<String , Object > document = new HashMap<> ();
248- document. put(" eventType" , event. getEventType(). name());
249- document. put(" timestamp" , event. getTimestamp());
250- document. put(" jobId" , event. getJobId());
251- document. put(" jobName" , event. getJobName());
252- document. putAll(event. getMetadataMap());
253-
254- indexDocument(indexName, document);
255- }
258+ ```
259+ {topic-name-converted}-YYYY-MM-DD
256260```
257261
262+ For example:
263+ - ` opencue.job.events ` → ` opencue-job-events-2024-11-29 `
264+ - ` opencue.frame.events ` → ` opencue-frame-events-2024-11-29 `
265+
258266## Testing
259267
260268### Unit testing event builders
@@ -312,15 +320,46 @@ public class KafkaEventPublisherIntegrationTest {
312320| ` monitoring.kafka.linger.ms ` | ` 100 ` | Time to wait before sending batch |
313321| ` monitoring.kafka.acks ` | ` 1 ` | Required acknowledgments |
314322
315- ### Elasticsearch configuration
323+ ### kafka-es-indexer configuration
316324
317- | Property | Default | Description |
318- | ----------| ---------| -------------|
319- | ` monitoring.elasticsearch.enabled ` | ` false ` | Enable ES storage |
320- | ` monitoring.elasticsearch.host ` | ` localhost ` | ES host |
321- | ` monitoring.elasticsearch.port ` | ` 9200 ` | ES port |
322- | ` monitoring.elasticsearch.scheme ` | ` http ` | Connection scheme |
323- | ` monitoring.elasticsearch.index.prefix ` | ` opencue ` | Index name prefix |
325+ The kafka-es-indexer is configured via command-line arguments, environment variables, or a YAML config file:
326+
327+ | CLI Argument | Env Variable | Default | Description |
328+ | --------------| --------------| ---------| -------------|
329+ | ` --kafka-servers ` | ` KAFKA_BOOTSTRAP_SERVERS ` | ` localhost:9092 ` | Kafka broker addresses |
330+ | ` --kafka-group-id ` | ` KAFKA_GROUP_ID ` | ` opencue-elasticsearch-indexer ` | Consumer group ID |
331+ | ` --elasticsearch-url ` | ` ELASTICSEARCH_URL ` | ` http://localhost:9200 ` | Elasticsearch URL |
332+ | ` --index-prefix ` | ` ELASTICSEARCH_INDEX_PREFIX ` | ` opencue ` | Elasticsearch index prefix |
333+ | ` --log-level ` | ` LOG_LEVEL ` | ` info ` | Log level (debug, info, warn, error) |
334+ | ` --config ` | - | - | Path to YAML config file |
335+
336+ The indexer automatically subscribes to all OpenCue Kafka topics:
337+ - ` opencue.job.events `
338+ - ` opencue.layer.events `
339+ - ` opencue.frame.events `
340+ - ` opencue.host.events `
341+ - ` opencue.proc.events `
342+
343+ Example with CLI arguments:
344+
345+ ``` bash
346+ kafka-es-indexer \
347+ --kafka-servers kafka:9092 \
348+ --kafka-group-id opencue-elasticsearch-indexer \
349+ --elasticsearch-url http://elasticsearch:9200 \
350+ --index-prefix opencue \
351+ --log-level info
352+ ```
353+
354+ Example with environment variables:
355+
356+ ``` bash
357+ export KAFKA_BOOTSTRAP_SERVERS=kafka:9092
358+ export KAFKA_GROUP_ID=opencue-elasticsearch-indexer
359+ export ELASTICSEARCH_URL=http://elasticsearch:9200
360+ export ELASTICSEARCH_INDEX_PREFIX=opencue
361+ kafka-es-indexer
362+ ```
324363
325364### Prometheus configuration
326365
@@ -331,23 +370,14 @@ public class KafkaEventPublisherIntegrationTest {
331370
332371## Debugging
333372
334- ### Enable debug logging
373+ ### Enable debug logging in Cuebot
335374
336375Add to ` log4j2.xml ` :
337376
338377``` xml
339378<Logger name =" com.imageworks.spcue.monitoring" level =" DEBUG" />
340379```
341380
342- ### Check event queue status
343-
344- Monitor the event queue via metrics:
345-
346- ``` promql
347- cue_monitoring_event_queue_size
348- cue_monitoring_events_dropped_total
349- ```
350-
351381### Verify Kafka connectivity
352382
353383``` bash
@@ -360,6 +390,23 @@ kafka-consumer-groups --bootstrap-server kafka:9092 \
360390 --group opencue-elasticsearch-indexer --describe
361391```
362392
393+ ### Debugging kafka-es-indexer
394+
395+ ``` bash
396+ # View indexer logs
397+ docker logs opencue-kafka-es-indexer
398+
399+ # Check indexer help
400+ docker exec opencue-kafka-es-indexer kafka-es-indexer --help
401+
402+ # Verify Elasticsearch indices are being created
403+ curl -s " http://localhost:9200/_cat/indices/opencue-*?v"
404+
405+ # Check event counts in Elasticsearch
406+ curl -s " http://localhost:9200/opencue-job-events-*/_count"
407+ curl -s " http://localhost:9200/opencue-frame-events-*/_count"
408+ ```
409+
363410## Best practices
364411
365412### Event design
0 commit comments