Skip to content

Commit a14cc21

Browse files
authored
[Fix #744] Prerelease improvements (#745)
* [Fix #744] Suspend/resume status and associated event Workflow suspend and resume event should be sent regardless a task was in fact suspended or not. Same applies to workflow status. Signed-off-by: Francisco Javier Tirado Sarti <[email protected]> * [Fix #744] Add api dependency to test and example Although already included because jackson-impl depends on it, it is a good practise to explicitly add API as dependency because this dependency is an implementation detail and the usage of the reader API by the exampple is not. Signed-off-by: Francisco Javier Tirado Sarti <[email protected]> * [Fix #744] Disabling event life cycle publishing In some scenarios, users wont need the cloud events to be published. This allows user to disable publishing at application level. Signed-off-by: Francisco Javier Tirado Sarti <[email protected]> * [Fix #744] Improving EventPublisher/EventConsumer There might be more that one EventPublisher EventPublishers are either added programatically or through service loader. There will be only one consumer. If user want to handler more than one consuming event broker, he should provide an specific implementation that deals with them. The one consumer is added programatically (priortiy) or default to service loader. If not eventConsumer was provides, InMemoryEvents is used as event consumer and publisher. ChatBotIT and LifeCycleEventTest has been changed accordingly. Signed-off-by: Francisco Javier Tirado Sarti <[email protected]> * [Fix #744] EmitExecutor shorter version Signed-off-by: Francisco Javier Tirado Sarti <[email protected]> --------- Signed-off-by: Francisco Javier Tirado Sarti <[email protected]>
1 parent eb5718f commit a14cc21

File tree

12 files changed

+259
-201
lines changed

12 files changed

+259
-201
lines changed

examples/events/pom.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,10 @@
1515

1616

1717
<dependencies>
18+
<dependency>
19+
<groupId>io.serverlessworkflow</groupId>
20+
<artifactId>serverlessworkflow-api</artifactId>
21+
</dependency>
1822
<dependency>
1923
<groupId>io.serverlessworkflow</groupId>
2024
<artifactId>serverlessworkflow-impl-jackson</artifactId>

examples/simpleGet/pom.xml

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,10 @@
1414
</properties>
1515

1616
<dependencies>
17+
<dependency>
18+
<groupId>io.serverlessworkflow</groupId>
19+
<artifactId>serverlessworkflow-api</artifactId>
20+
</dependency>
1721
<dependency>
1822
<groupId>io.serverlessworkflow</groupId>
1923
<artifactId>serverlessworkflow-impl-jackson</artifactId>
@@ -23,8 +27,8 @@
2327
<artifactId>serverlessworkflow-impl-http</artifactId>
2428
</dependency>
2529
<dependency>
26-
<groupId>org.glassfish.jersey.media</groupId>
27-
<artifactId>jersey-media-json-jackson</artifactId>
30+
<groupId>org.glassfish.jersey.media</groupId>
31+
<artifactId>jersey-media-json-jackson</artifactId>
2832
</dependency>
2933
<dependency>
3034
<groupId>org.glassfish.jersey.core</groupId>

fluent/agentic/src/test/java/io/serverlessworkflow/fluent/agentic/ChatBotIT.java

Lines changed: 12 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,12 @@
2828
import dev.langchain4j.memory.chat.MessageWindowChatMemory;
2929
import io.cloudevents.CloudEvent;
3030
import io.cloudevents.core.v1.CloudEventBuilder;
31-
import io.serverlessworkflow.api.types.EventFilter;
32-
import io.serverlessworkflow.api.types.EventProperties;
3331
import io.serverlessworkflow.api.types.Workflow;
3432
import io.serverlessworkflow.impl.WorkflowApplication;
3533
import io.serverlessworkflow.impl.WorkflowInstance;
3634
import io.serverlessworkflow.impl.WorkflowModel;
3735
import io.serverlessworkflow.impl.WorkflowStatus;
36+
import io.serverlessworkflow.impl.events.InMemoryEvents;
3837
import java.net.URI;
3938
import java.time.OffsetDateTime;
4039
import java.util.Map;
@@ -103,25 +102,15 @@ void chat_bot() {
103102
.emit(emit -> emit.event(e -> e.type("org.acme.chatbot.finished"))))
104103
.build();
105104

106-
try (WorkflowApplication app = WorkflowApplication.builder().build()) {
107-
app.eventConsumer()
108-
.register(
109-
app.eventConsumer()
110-
.listen(
111-
new EventFilter()
112-
.withWith(new EventProperties().withType("org.acme.chatbot.reply")),
113-
app),
114-
ce -> replyEvents.add((CloudEvent) ce));
115-
116-
app.eventConsumer()
117-
.register(
118-
app.eventConsumer()
119-
.listen(
120-
new EventFilter()
121-
.withWith(new EventProperties().withType("org.acme.chatbot.finished")),
122-
app),
123-
ce -> finishedEvents.add((CloudEvent) ce));
105+
InMemoryEvents eventBroker = new InMemoryEvents();
106+
eventBroker.register("org.acme.chatbot.reply", ce -> replyEvents.add((CloudEvent) ce));
107+
eventBroker.register("org.acme.chatbot.finished", ce -> finishedEvents.add((CloudEvent) ce));
124108

109+
try (WorkflowApplication app =
110+
WorkflowApplication.builder()
111+
.withEventConsumer(eventBroker)
112+
.withEventPublisher(eventBroker)
113+
.build()) {
125114
final WorkflowInstance waitingInstance =
126115
app.workflowDefinition(listenWorkflow).instance(Map.of());
127116
final CompletableFuture<WorkflowModel> runningModel = waitingInstance.start();
@@ -130,12 +119,12 @@ void chat_bot() {
130119
assertEquals(WorkflowStatus.WAITING, waitingInstance.status());
131120

132121
// Publish the events
133-
app.eventPublisher().publish(newMessageEvent("Hello World!"));
122+
eventBroker.publish(newMessageEvent("Hello World!"));
134123
CloudEvent reply = replyEvents.poll(60, TimeUnit.SECONDS);
135124
assertNotNull(reply);
136125

137126
// Empty message completes the workflow
138-
app.eventPublisher().publish(newMessageEvent("", "org.acme.chatbot.finalize"));
127+
eventBroker.publish(newMessageEvent("", "org.acme.chatbot.finalize"));
139128
CloudEvent finished = finishedEvents.poll(60, TimeUnit.SECONDS);
140129
assertNotNull(finished);
141130
assertThat(finishedEvents).isEmpty();
@@ -145,6 +134,7 @@ void chat_bot() {
145134

146135
} catch (InterruptedException e) {
147136
fail(e.getMessage());
137+
} finally {
148138
}
149139
}
150140

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java

Lines changed: 41 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import io.serverlessworkflow.impl.resources.StaticResource;
3333
import io.serverlessworkflow.impl.schema.SchemaValidator;
3434
import io.serverlessworkflow.impl.schema.SchemaValidatorFactory;
35+
import java.util.ArrayList;
3536
import java.util.Collection;
3637
import java.util.Collections;
3738
import java.util.Map;
@@ -58,7 +59,8 @@ public class WorkflowApplication implements AutoCloseable {
5859
private final ExecutorServiceFactory executorFactory;
5960
private final RuntimeDescriptorFactory runtimeDescriptorFactory;
6061
private final EventConsumer<?, ?> eventConsumer;
61-
private final EventPublisher eventPublisher;
62+
private final Collection<EventPublisher> eventPublishers;
63+
private final boolean lifeCycleCEPublishingEnabled;
6264

6365
private WorkflowApplication(Builder builder) {
6466
this.taskFactory = builder.taskFactory;
@@ -72,7 +74,8 @@ private WorkflowApplication(Builder builder) {
7274
this.listeners = builder.listeners != null ? builder.listeners : Collections.emptySet();
7375
this.definitions = new ConcurrentHashMap<>();
7476
this.eventConsumer = builder.eventConsumer;
75-
this.eventPublisher = builder.eventPublisher;
77+
this.eventPublishers = builder.eventPublishers;
78+
this.lifeCycleCEPublishingEnabled = builder.lifeCycleCEPublishingEnabled;
7679
}
7780

7881
public TaskExecutorFactory taskFactory() {
@@ -99,8 +102,8 @@ public Collection<WorkflowExecutionListener> listeners() {
99102
return listeners;
100103
}
101104

102-
public EventPublisher eventPublisher() {
103-
return eventPublisher;
105+
public Collection<EventPublisher> eventPublishers() {
106+
return eventPublishers;
104107
}
105108

106109
public WorkflowIdFactory idFactory() {
@@ -142,9 +145,10 @@ public SchemaValidator getValidator(SchemaInline inline) {
142145
private WorkflowIdFactory idFactory = () -> UlidCreator.getMonotonicUlid().toString();
143146
private ExecutorServiceFactory executorFactory = new DefaultExecutorServiceFactory();
144147
private EventConsumer<?, ?> eventConsumer;
145-
private EventPublisher eventPublisher;
148+
private Collection<EventPublisher> eventPublishers = new ArrayList<>();
146149
private RuntimeDescriptorFactory descriptorFactory =
147150
() -> new RuntimeDescriptor("reference impl", "1.0.0_alpha", Collections.emptyMap());
151+
private boolean lifeCycleCEPublishingEnabled = true;
148152

149153
private Builder() {}
150154

@@ -168,6 +172,11 @@ public Builder withResourceLoaderFactory(ResourceLoaderFactory resourceLoader) {
168172
return this;
169173
}
170174

175+
public Builder disableLifeCycleCEPublishing() {
176+
this.lifeCycleCEPublishingEnabled = false;
177+
return this;
178+
}
179+
171180
public Builder withExecutorFactory(ExecutorServiceFactory executorFactory) {
172181
this.executorFactory = executorFactory;
173182
return this;
@@ -193,10 +202,13 @@ public Builder withDescriptorFactory(RuntimeDescriptorFactory factory) {
193202
return this;
194203
}
195204

196-
public Builder withEventHandler(
197-
EventPublisher eventPublisher, EventConsumer<?, ?> eventConsumer) {
205+
public Builder withEventConsumer(EventConsumer<?, ?> eventConsumer) {
198206
this.eventConsumer = eventConsumer;
199-
this.eventPublisher = eventPublisher;
207+
return this;
208+
}
209+
210+
public Builder withEventPublisher(EventPublisher eventPublisher) {
211+
this.eventPublishers.add(eventPublisher);
200212
return this;
201213
}
202214

@@ -219,10 +231,19 @@ public WorkflowApplication build() {
219231
.findFirst()
220232
.orElseGet(() -> DefaultTaskExecutorFactory.get());
221233
}
222-
if (eventConsumer == null && eventPublisher == null) {
223-
InMemoryEvents inMemory = new InMemoryEvents(executorFactory);
224-
eventPublisher = inMemory;
225-
eventConsumer = inMemory;
234+
ServiceLoader.load(EventPublisher.class).forEach(e -> eventPublishers.add(e));
235+
if (eventConsumer == null) {
236+
eventConsumer =
237+
ServiceLoader.load(EventConsumer.class)
238+
.findFirst()
239+
.orElseGet(
240+
() -> {
241+
InMemoryEvents inMemory = new InMemoryEvents(executorFactory);
242+
if (eventPublishers.isEmpty()) {
243+
eventPublishers.add(inMemory);
244+
}
245+
return inMemory;
246+
});
226247
}
227248
return new WorkflowApplication(this);
228249
}
@@ -242,8 +263,11 @@ public WorkflowDefinition workflowDefinition(Workflow workflow) {
242263
@Override
243264
public void close() {
244265
safeClose(executorFactory);
245-
safeClose(eventPublisher);
266+
for (EventPublisher eventPublisher : eventPublishers) {
267+
safeClose(eventPublisher);
268+
}
246269
safeClose(eventConsumer);
270+
247271
for (WorkflowDefinition definition : definitions.values()) {
248272
safeClose(definition);
249273
}
@@ -278,4 +302,8 @@ public EventConsumer eventConsumer() {
278302
public ExecutorService executorService() {
279303
return executorFactory.get();
280304
}
305+
306+
public boolean isLifeCycleCEPublishingEnabled() {
307+
return lifeCycleCEPublishingEnabled;
308+
}
281309
}

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowMutableInstance.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,10 @@ public boolean suspend() {
176176
statusLock.lock();
177177
if (TaskExecutorHelper.isActive(status.get())) {
178178
suspended = new CompletableFuture<TaskContext>();
179+
workflowContext.instance().status(WorkflowStatus.SUSPENDED);
180+
publishEvent(
181+
workflowContext,
182+
l -> l.onWorkflowSuspended(new WorkflowSuspendedEvent(workflowContext)));
179183
return true;
180184
} else {
181185
return false;
@@ -197,11 +201,11 @@ public boolean resume() {
197201
publishEvent(
198202
workflowContext,
199203
l -> l.onTaskResumed(new TaskResumedEvent(workflowContext, suspendedTask)));
200-
publishEvent(
201-
workflowContext, l -> l.onWorkflowResumed(new WorkflowResumedEvent(workflowContext)));
202204
} else {
203205
suspended = null;
204206
}
207+
publishEvent(
208+
workflowContext, l -> l.onWorkflowResumed(new WorkflowResumedEvent(workflowContext)));
205209
return true;
206210
} else {
207211
return false;
@@ -216,12 +220,8 @@ public CompletableFuture<TaskContext> completedChecks(TaskContext t) {
216220
statusLock.lock();
217221
if (suspended != null) {
218222
suspendedTask = t;
219-
workflowContext.instance().status(WorkflowStatus.SUSPENDED);
220223
publishEvent(
221224
workflowContext, l -> l.onTaskSuspended(new TaskSuspendedEvent(workflowContext, t)));
222-
publishEvent(
223-
workflowContext,
224-
l -> l.onWorkflowSuspended(new WorkflowSuspendedEvent(workflowContext)));
225225
return suspended;
226226
}
227227
if (cancelled != null) {

impl/core/src/main/java/io/serverlessworkflow/impl/events/InMemoryEvents.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package io.serverlessworkflow.impl.events;
1717

1818
import io.cloudevents.CloudEvent;
19+
import io.serverlessworkflow.impl.DefaultExecutorServiceFactory;
1920
import io.serverlessworkflow.impl.ExecutorServiceFactory;
2021
import java.util.Map;
2122
import java.util.concurrent.CompletableFuture;
@@ -29,6 +30,10 @@
2930
*/
3031
public class InMemoryEvents extends AbstractTypeConsumer implements EventPublisher {
3132

33+
public InMemoryEvents() {
34+
this(new DefaultExecutorServiceFactory());
35+
}
36+
3237
public InMemoryEvents(ExecutorServiceFactory serviceFactory) {
3338
this.serviceFactory = serviceFactory;
3439
}
@@ -40,7 +45,7 @@ public InMemoryEvents(ExecutorServiceFactory serviceFactory) {
4045
private AtomicReference<Consumer<CloudEvent>> allConsumerRef = new AtomicReference<>();
4146

4247
@Override
43-
protected void register(String topicName, Consumer<CloudEvent> consumer) {
48+
public void register(String topicName, Consumer<CloudEvent> consumer) {
4449
topicMap.put(topicName, consumer);
4550
}
4651

@@ -77,6 +82,7 @@ protected void unregisterFromAll() {
7782

7883
@Override
7984
public void close() {
85+
topicMap.clear();
8086
serviceFactory.close();
8187
}
8288
}

impl/core/src/main/java/io/serverlessworkflow/impl/executors/EmitExecutor.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,12 @@
3333
import io.serverlessworkflow.impl.WorkflowUtils;
3434
import io.serverlessworkflow.impl.WorkflowValueResolver;
3535
import io.serverlessworkflow.impl.events.CloudEventUtils;
36+
import io.serverlessworkflow.impl.events.EventPublisher;
3637
import io.serverlessworkflow.impl.expressions.ExpressionDescriptor;
3738
import io.serverlessworkflow.impl.resources.ResourceLoader;
3839
import java.net.URI;
3940
import java.time.OffsetDateTime;
41+
import java.util.Collection;
4042
import java.util.Map;
4143
import java.util.Optional;
4244
import java.util.concurrent.CompletableFuture;
@@ -74,11 +76,13 @@ private EmitExecutor(EmitExecutorBuilder builder) {
7476
@Override
7577
protected CompletableFuture<WorkflowModel> internalExecute(
7678
WorkflowContext workflow, TaskContext taskContext) {
77-
return workflow
78-
.definition()
79-
.application()
80-
.eventPublisher()
81-
.publish(buildCloudEvent(workflow, taskContext))
79+
Collection<EventPublisher> eventPublishers =
80+
workflow.definition().application().eventPublishers();
81+
CloudEvent ce = buildCloudEvent(workflow, taskContext);
82+
return CompletableFuture.allOf(
83+
eventPublishers.stream()
84+
.map(eventPublisher -> eventPublisher.publish(ce))
85+
.toArray(size -> new CompletableFuture[size]))
8286
.thenApply(v -> taskContext.input());
8387
}
8488

0 commit comments

Comments
 (0)