Skip to content

Provided a better solution for reading dead outbox entries #1940

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 58 additions & 16 deletions java/outbox.md
Original file line number Diff line number Diff line change
Expand Up @@ -355,29 +355,71 @@ It is crucial to make the service `OutboxDeadLetterQueueService` accessible for

:::

### Filter for Dead Entries
### Reading Dead Entries

This filtering can't be done on the database since the maximum number of attempts is only available from the CDS properties.

To ensure that only dead outbox entries are returned when reading `DeadOutboxMessages`, the following code provides the handler for the `DeadLetterQueueService` and the `@After-READ` handler that filters for the dead outbox entries:
This filtering of dead entries are done on the database by adding `where` clauses for each outbox and their maximum number of retries. The following code provides the handler for the `DeadLetterQueueService` that reads the dead entries directly from the entity `Messages`:

```java
@Component
@ServiceName(OutboxDeadLetterQueueService_.CDS_NAME)
public class DeadOutboxMessagesHandler implements EventHandler {

@After(entity = DeadOutboxMessages_.CDS_NAME)
public void filterDeadEntries(CdsReadEventContext context) {
CdsProperties.Outbox outboxConfigs = context.getCdsRuntime().getEnvironment().getCdsProperties().getOutbox();
List<DeadOutboxMessages> deadEntries = context
.getResult()
.listOf(DeadOutboxMessages.class)
.stream()
.filter(entry -> entry.getAttempts() >= outboxConfigs.getService(entry.getTarget()).getMaxAttempts())
.toList();

context.setResult(deadEntries);
}
private final PersistenceService db;

public DeadOutboxMessagesHandler(@Qualifier(PersistenceService.DEFAULT_NAME) PersistenceService db) {
this.db = db;
}

@On(service = PersistenceService.DEFAULT_NAME, entity = DeadOutboxMessages_.CDS_NAME)
public void readDeadOutboxMessages(CdsReadEventContext context) {
Optional<Predicate> outboxFilters = this.createOutboxFilters(context.getCdsRuntime());
CqnSelect cqn = context.getCqn();
Select<StructuredType<?>> select = Select
.from(Messages_.CDS_NAME)
.columns(cqn.items());

select = select.groupBy(cqn.groupBy());
select = select.excluding(cqn.excluding());
if(cqn.having().isPresent()) {
select = select.having(cqn.having().get());
}
if(cqn.search().isPresent()) {
select.search(cqn.search().get());
}
if(cqn.where().isPresent()) {
CqnPredicate where = cqn.where().get();
if (outboxFilters.isPresent()) {
where = outboxFilters.get().and(where);
}
select = select.where(where);
} else if (outboxFilters.isPresent()) {
select = select.where(outboxFilters.get());
}
select = select.orderBy(cqn.orderBy()).limit(cqn.top(), cqn.skip()).inlineCount();

List<Row> deadMessages = this.db.run(select).list();
context.setResult(ResultBuilder.selectedRows(deadMessages).inlineCount(deadMessages.size()).result());
}

private Optional<Predicate> createOutboxFilters(CdsRuntime runtime) {
List<OutboxService> outboxServices = runtime.getServiceCatalog().getServices(OutboxService.class)
.filter(s -> !s.getName().equals(OutboxService.INMEMORY_NAME)).toList();
CdsProperties.Outbox outboxConfigs = runtime.getEnvironment().getCdsProperties().getOutbox();

Predicate where = null;
for(OutboxService service : outboxServices) {
OutboxServiceConfig config = outboxConfigs.getService(service.getName());
Predicate targetPredicate = CQL.get(Messages.TARGET).eq(service.getName()).and(CQL.get(Messages.ATTEMPTS).ge(config.getMaxAttempts()));

if (where == null) {
where = targetPredicate;
} else {
where = where.or(targetPredicate);
}
}

return Optional.ofNullable(where);
}
}
```

Expand Down