Skip to content

Commit 2663d47

Browse files
committed
Refactor Kafka message handling and introduce SummaryBook entity
1 parent 997b844 commit 2663d47

File tree

11 files changed

+88
-56
lines changed

11 files changed

+88
-56
lines changed
Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,9 @@
11
package app.quantun.summary.message.consumer;
22

3-
import app.quantun.summary.model.contract.MessagePayload;
4-
import org.springframework.kafka.annotation.KafkaListener;
3+
import app.quantun.summary.model.contract.message.BookFilePayload;
54

65
import java.util.List;
7-
import java.util.Map;
86

97
public interface MyKafkaConsumer {
10-
void processMessages(List<MessagePayload> messages);
8+
void processMessages(List<BookFilePayload> messages);
119
}

src/main/java/app/quantun/summary/message/consumer/MyKafkaConsumerImpl.java

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,28 @@
11
package app.quantun.summary.message.consumer;
22

3-
import app.quantun.summary.model.contract.MessagePayload;
3+
import app.quantun.summary.model.contract.message.BookFilePayload;
4+
import app.quantun.summary.repository.SummaryBookRepository;
5+
import lombok.NoArgsConstructor;
6+
import lombok.RequiredArgsConstructor;
47
import lombok.extern.slf4j.Slf4j;
5-
import org.apache.kafka.clients.consumer.ConsumerRecord;
68
import org.springframework.kafka.annotation.KafkaListener;
79
import org.springframework.kafka.support.Acknowledgment;
8-
import org.springframework.messaging.handler.annotation.Payload;
910
import org.springframework.retry.annotation.Backoff;
1011
import org.springframework.retry.annotation.Retryable;
1112
import org.springframework.stereotype.Service;
1213

1314
import java.util.List;
14-
import java.util.Map;
1515

1616
/**
1717
* Kafka consumer service for processing messages from the "bulk-data" topic.
1818
* This class listens to Kafka messages and logs the received messages.
1919
*/
2020
@Service
2121
@Slf4j
22+
@RequiredArgsConstructor
2223
public class MyKafkaConsumerImpl implements MyKafkaConsumer {
2324

25+
private final SummaryBookRepository summaryBookRepository;
2426

2527
/**
2628
* Process incoming messages from the "bulk-data" topic.
@@ -40,7 +42,7 @@ public class MyKafkaConsumerImpl implements MyKafkaConsumer {
4042
maxAttempts = 3,
4143
backoff = @Backoff(delay = 2000)
4244
)
43-
public void listen(List<MessagePayload> messages, Acknowledgment ack) {
45+
public void listen(List<BookFilePayload> messages, Acknowledgment ack) {
4446
try {
4547
log.info("Received batch of {} messages", messages.size());
4648

@@ -62,9 +64,10 @@ public void listen(List<MessagePayload> messages, Acknowledgment ack) {
6264
*
6365
* @param messages List of MessagePayload to process.
6466
*/
65-
public void processMessages(List<MessagePayload> messages) {
67+
public void processMessages(List<BookFilePayload> messages) {
6668
// Your processing logic here
67-
for (MessagePayload message : messages) {
69+
for (BookFilePayload message : messages) {
70+
this.summaryBookRepository.findById(message.id());
6871
log.info("Processing message: {}", message);
6972
}
7073

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
package app.quantun.summary.message.producer;
22

3+
import app.quantun.summary.model.contract.message.BookFilePayload;
4+
35
public interface KafkaProducerService {
4-
void sendHashMapMessage();
6+
7+
8+
void sendBookToBeProcessed(BookFilePayload message);
59
}
Lines changed: 11 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,12 @@
11
package app.quantun.summary.message.producer;
22

3-
import app.quantun.summary.model.contract.MessagePayload;
3+
import app.quantun.summary.model.contract.message.BookFilePayload;
44
import lombok.RequiredArgsConstructor;
55
import lombok.extern.slf4j.Slf4j;
66
import org.springframework.beans.factory.annotation.Value;
77
import org.springframework.kafka.core.KafkaTemplate;
88
import org.springframework.kafka.support.SendResult;
99
import org.springframework.stereotype.Service;
10-
import org.springframework.util.concurrent.ListenableFuture;
11-
import org.springframework.util.concurrent.ListenableFutureCallback;
1210

1311
import java.util.concurrent.CompletableFuture;
1412
import java.util.stream.IntStream;
@@ -22,38 +20,27 @@
2220
@Slf4j
2321
public class KafkaProducerServiceImpl implements KafkaProducerService {
2422

25-
private final KafkaTemplate<Object, MessagePayload> kafkaTemplate;
23+
private final KafkaTemplate<Object, BookFilePayload> kafkaTemplate;
2624

2725
// Externalize the topic name for better configurability
2826
@Value("${kafka.topic.bulk-data}")
2927
private String bulkDataTopic;
3028

3129

32-
/**
33-
* Sends a batch of HashMap messages to the "bulk-data" topic.
34-
* Each message contains an ID, name, and value.
35-
*/
36-
@Override
37-
public void sendHashMapMessage() {
38-
IntStream.rangeClosed(1, 10)
39-
.forEach(i -> {
40-
MessagePayload messagePayload = new MessagePayload(
41-
String.valueOf(i),
42-
"User" + i,
43-
"Some data for user " + i
44-
);
4530

46-
CompletableFuture<SendResult<Object, MessagePayload>> future = kafkaTemplate.send(bulkDataTopic, messagePayload).toCompletableFuture();
4731

48-
future.thenAccept(result ->
49-
log.info("Successfully sent message {} with payload: {}", i, messagePayload))
32+
@Override
33+
public void sendBookToBeProcessed(BookFilePayload message) {
34+
CompletableFuture<SendResult<Object, BookFilePayload>> future = kafkaTemplate.send(bulkDataTopic, message).toCompletableFuture();
5035

36+
future.thenAccept(result ->
37+
log.info("Successfully sent message with payload: {}", message))
5138

52-
.exceptionally(ex -> {
53-
log.error("Failed to send message {} with payload: {}. Exception: {}", i, messagePayload, ex.getMessage());
54-
return null;
55-
});
39+
.exceptionally(ex -> {
40+
log.error("Failed to send message with payload: {}. Exception: {}", message, ex.getMessage());
41+
return null;
5642
});
43+
5744
}
5845

5946
}

src/main/java/app/quantun/summary/model/contract/MessagePayload.java

Lines changed: 0 additions & 4 deletions
This file was deleted.
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
package app.quantun.summary.model.contract.message;
2+
3+
public record BookFilePayload(Long id, String name) {}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package app.quantun.summary.model.entity;
2+
3+
import jakarta.persistence.*;
4+
import lombok.*;
5+
6+
@Data
7+
@EqualsAndHashCode(of = "id")
8+
@Entity
9+
@Table(indexes = {
10+
@Index(name = "idx_summarybook_uuid", columnList = "uuid")
11+
})
12+
@Builder
13+
@AllArgsConstructor
14+
@NoArgsConstructor
15+
16+
public class SummaryBook {
17+
@Id
18+
@GeneratedValue(strategy = GenerationType.SEQUENCE)
19+
@Column(nullable = false)
20+
private Long id;
21+
private String name;
22+
private String path;
23+
@Column(columnDefinition = "TEXT")
24+
// add index
25+
26+
private String uuid;
27+
28+
29+
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package app.quantun.summary.repository;
2+
3+
import app.quantun.summary.model.entity.SummaryBook;
4+
import org.springframework.data.jpa.repository.JpaRepository;
5+
6+
public interface SummaryBookRepository extends JpaRepository<SummaryBook, Long> {
7+
}

src/main/java/app/quantun/summary/service/impl/FileStorageServiceImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,6 @@ private String saveFileToDisk(MultipartFile file, Path uploadPath) throws IOExce
9292
Path filePath = uploadPath.resolve(fileName);
9393
log.info("Saving file to disk at: {}", filePath);
9494
file.transferTo(filePath);
95-
return fileName;
95+
return filePath.toString();
9696
}
9797
}

src/main/java/app/quantun/summary/service/impl/PdfServicesImpl.java

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@
22

33
import app.quantun.summary.message.producer.KafkaProducerService;
44
import app.quantun.summary.model.contract.dto.TableIndexContent;
5+
import app.quantun.summary.model.contract.message.BookFilePayload;
6+
import app.quantun.summary.model.entity.SummaryBook;
7+
import app.quantun.summary.repository.SummaryBookRepository;
58
import app.quantun.summary.service.FileStorageService;
69
import app.quantun.summary.service.PdfServices;
710
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -22,6 +25,7 @@
2225
import org.springframework.web.multipart.MultipartFile;
2326

2427
import java.util.Map;
28+
import java.util.UUID;
2529

2630
/**
2731
* Service implementation for PDF-related operations.
@@ -39,18 +43,11 @@ public class PdfServicesImpl implements PdfServices {
3943
private final ChatClient openAiChatClient;
4044
@Qualifier("geminiChatClient")
4145
private final ChatClient geminiChatClient;
42-
private final ObjectMapper objectMapper;
46+
4347
private final ResourceLoader resourceLoader;
44-
@Value("classpath:/templates/rag-prompt-template.st")
45-
private Resource ressourceRagPromptTemplate;
46-
@Value("classpath:/templates/system-message.st")
47-
private Resource resourceSystemMessageTemplate;
48-
@Value("classpath:/templates/rag-prompt-without-metadata-template.st")
49-
private Resource resourceRagPromptWithoutMedataTemplate;
50-
@Value("classpath:templates/get-capital-prompt.st")
51-
private Resource resourceCapitalPromptTemplate;
52-
@Value("classpath:templates/get-capital-with-info.st")
53-
private Resource resourceCapitalWithInfoPromptTemplate;
48+
49+
private final SummaryBookRepository summaryBookRepository;
50+
5451
@Value("classpath:templates/system/table.content.st")
5552
private Resource tableOfContentPromptTemplate;
5653

@@ -84,8 +81,16 @@ public TableIndexContent getBookTableOfContentPages(String message) {
8481
*/
8582
@Override
8683
public String storePdfFile(MultipartFile file) {
87-
val message = this.fileStorageService.storePdfFile(file);
88-
this.kafkaProducerService.sendHashMapMessage();
89-
return message;
84+
val bookPath = this.fileStorageService.storePdfFile(file);
85+
val summaryBook = SummaryBook.builder()
86+
.uuid(UUID.randomUUID().toString())
87+
.name(file.getOriginalFilename())
88+
.path(bookPath)
89+
.build();
90+
val savedSummaryBook = this.summaryBookRepository.save(summaryBook);
91+
92+
val message=new BookFilePayload( savedSummaryBook.getId(),savedSummaryBook.getName());
93+
this.kafkaProducerService.sendBookToBeProcessed(message);
94+
return message.toString();
9095
}
9196
}

0 commit comments

Comments
 (0)