Skip to content

Commit 8f39cc3

Browse files
[cuebot/rust/sandbox] Decouple Kafka consumer into standalone Rust indexer
Move Kafka-to-Elasticsearch event indexing from Cuebot to a standalone Rust service, addressing code review feedback to decouple the consumer from the Java codebase. Rust kafka-es-indexer: - Add rust/crates/kafka-es-indexer: standalone Kafka consumer that indexes OpenCue events (job, layer, frame, host, proc) to Elasticsearch - Async Kafka consumer with configurable batch processing - Elasticsearch bulk indexing with date-based indices - Index templates with proper field mappings for all event types - CLI with environment variable configuration Cuebot cleanup: - Remove Java KafkaEventConsumer and ElasticsearchClient classes - Remove getJobHistory, getFrameHistory, getLayerHistory, getLayerMemoryHistory from HistoricalDao and HistoricalManager - Update ManageMonitoring gRPC servant to return UNIMPLEMENTED with message directing users to query Elasticsearch directly - Keep KafkaEventPublisher for publishing events from Cuebot to Kafka - Keep core job archival methods (getFinishedJobs, transferJob) intact Infrastructure: - Update docker-compose.monitoring-full.yml to include kafka-es-indexer
1 parent b03689f commit 8f39cc3

File tree

21 files changed

+2172
-1420
lines changed

21 files changed

+2172
-1420
lines changed

cuebot/src/main/java/com/imageworks/spcue/dao/HistoricalDao.java

Lines changed: 0 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,6 @@
1818
import java.util.List;
1919

2020
import com.imageworks.spcue.JobInterface;
21-
import com.imageworks.spcue.grpc.job.FrameState;
22-
import com.imageworks.spcue.grpc.job.JobState;
23-
import com.imageworks.spcue.grpc.monitoring.HistoricalFrame;
24-
import com.imageworks.spcue.grpc.monitoring.HistoricalJob;
25-
import com.imageworks.spcue.grpc.monitoring.HistoricalLayer;
26-
import com.imageworks.spcue.grpc.monitoring.LayerMemoryRecord;
2721

2822
public interface HistoricalDao {
2923

@@ -42,29 +36,4 @@ public interface HistoricalDao {
4236
*/
4337
void transferJob(JobInterface job);
4438

45-
/**
46-
* Query historical job records from the job_history table.
47-
*/
48-
List<HistoricalJob> getJobHistory(List<String> shows, List<String> users, List<String> shots,
49-
List<String> jobNameRegex, List<JobState> states, long startTime, long endTime,
50-
int page, int pageSize, int maxResults);
51-
52-
/**
53-
* Query historical frame records from the frame_history table.
54-
*/
55-
List<HistoricalFrame> getFrameHistory(String jobId, String jobName, List<String> layerNames,
56-
List<FrameState> states, long startTime, long endTime, int page, int pageSize);
57-
58-
/**
59-
* Query historical layer records from the layer_history table.
60-
*/
61-
List<HistoricalLayer> getLayerHistory(String jobId, String jobName, long startTime,
62-
long endTime, int page, int pageSize);
63-
64-
/**
65-
* Query historical memory usage for a layer type.
66-
*/
67-
List<LayerMemoryRecord> getLayerMemoryHistory(String layerName, List<String> shows,
68-
long startTime, long endTime, int maxResults);
69-
7039
}

cuebot/src/main/java/com/imageworks/spcue/dao/postgres/HistoricalDaoJdbc.java

Lines changed: 0 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -15,27 +15,16 @@
1515

1616
package com.imageworks.spcue.dao.postgres;
1717

18-
import java.util.Collections;
1918
import java.util.List;
2019

21-
import org.springframework.beans.factory.annotation.Autowired;
2220
import org.springframework.jdbc.core.support.JdbcDaoSupport;
2321

2422
import com.imageworks.spcue.JobInterface;
2523
import com.imageworks.spcue.dao.HistoricalDao;
26-
import com.imageworks.spcue.grpc.job.FrameState;
2724
import com.imageworks.spcue.grpc.job.JobState;
28-
import com.imageworks.spcue.grpc.monitoring.HistoricalFrame;
29-
import com.imageworks.spcue.grpc.monitoring.HistoricalJob;
30-
import com.imageworks.spcue.grpc.monitoring.HistoricalLayer;
31-
import com.imageworks.spcue.grpc.monitoring.LayerMemoryRecord;
32-
import com.imageworks.spcue.monitoring.ElasticsearchClient;
3325

3426
public class HistoricalDaoJdbc extends JdbcDaoSupport implements HistoricalDao {
3527

36-
@Autowired(required = false)
37-
private ElasticsearchClient elasticsearchClient;
38-
3928
private static final String GET_FINISHED_JOBS = JobDaoJdbc.GET_JOB + "WHERE "
4029
+ "job.str_state = ? " + "AND " + "current_timestamp - job.ts_stopped > ";
4130

@@ -51,46 +40,4 @@ public void transferJob(JobInterface job) {
5140
*/
5241
getJdbcTemplate().update("DELETE FROM job WHERE pk_job=?", job.getJobId());
5342
}
54-
55-
@Override
56-
public List<HistoricalJob> getJobHistory(List<String> shows, List<String> users,
57-
List<String> shots, List<String> jobNameRegex, List<JobState> states, long startTime,
58-
long endTime, int page, int pageSize, int maxResults) {
59-
if (elasticsearchClient != null && elasticsearchClient.isEnabled()) {
60-
return elasticsearchClient.searchJobHistory(shows, users, shots, jobNameRegex, states,
61-
startTime, endTime, page, pageSize, maxResults);
62-
}
63-
return Collections.emptyList();
64-
}
65-
66-
@Override
67-
public List<HistoricalFrame> getFrameHistory(String jobId, String jobName,
68-
List<String> layerNames, List<FrameState> states, long startTime, long endTime,
69-
int page, int pageSize) {
70-
if (elasticsearchClient != null && elasticsearchClient.isEnabled()) {
71-
return elasticsearchClient.searchFrameHistory(jobId, jobName, layerNames, states,
72-
startTime, endTime, page, pageSize);
73-
}
74-
return Collections.emptyList();
75-
}
76-
77-
@Override
78-
public List<HistoricalLayer> getLayerHistory(String jobId, String jobName, long startTime,
79-
long endTime, int page, int pageSize) {
80-
if (elasticsearchClient != null && elasticsearchClient.isEnabled()) {
81-
return elasticsearchClient.searchLayerHistory(jobId, jobName, startTime, endTime, page,
82-
pageSize);
83-
}
84-
return Collections.emptyList();
85-
}
86-
87-
@Override
88-
public List<LayerMemoryRecord> getLayerMemoryHistory(String layerName, List<String> shows,
89-
long startTime, long endTime, int maxResults) {
90-
if (elasticsearchClient != null && elasticsearchClient.isEnabled()) {
91-
return elasticsearchClient.searchLayerMemoryHistory(layerName, shows, startTime,
92-
endTime, maxResults);
93-
}
94-
return Collections.emptyList();
95-
}
9643
}

0 commit comments

Comments
 (0)