Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions examples/events/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@


<dependencies>
<dependency>
<groupId>io.serverlessworkflow</groupId>
<artifactId>serverlessworkflow-api</artifactId>
</dependency>
<dependency>
<groupId>io.serverlessworkflow</groupId>
<artifactId>serverlessworkflow-impl-jackson</artifactId>
Expand Down
8 changes: 6 additions & 2 deletions examples/simpleGet/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@
</properties>

<dependencies>
<dependency>
<groupId>io.serverlessworkflow</groupId>
<artifactId>serverlessworkflow-api</artifactId>
</dependency>
<dependency>
<groupId>io.serverlessworkflow</groupId>
<artifactId>serverlessworkflow-impl-jackson</artifactId>
Expand All @@ -23,8 +27,8 @@
<artifactId>serverlessworkflow-impl-http</artifactId>
</dependency>
<dependency>
<groupId>org.glassfish.jersey.media</groupId>
<artifactId>jersey-media-json-jackson</artifactId>
<groupId>org.glassfish.jersey.media</groupId>
<artifactId>jersey-media-json-jackson</artifactId>
</dependency>
<dependency>
<groupId>org.glassfish.jersey.core</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,12 @@
import dev.langchain4j.memory.chat.MessageWindowChatMemory;
import io.cloudevents.CloudEvent;
import io.cloudevents.core.v1.CloudEventBuilder;
import io.serverlessworkflow.api.types.EventFilter;
import io.serverlessworkflow.api.types.EventProperties;
import io.serverlessworkflow.api.types.Workflow;
import io.serverlessworkflow.impl.WorkflowApplication;
import io.serverlessworkflow.impl.WorkflowInstance;
import io.serverlessworkflow.impl.WorkflowModel;
import io.serverlessworkflow.impl.WorkflowStatus;
import io.serverlessworkflow.impl.events.InMemoryEvents;
import java.net.URI;
import java.time.OffsetDateTime;
import java.util.Map;
Expand Down Expand Up @@ -103,25 +102,15 @@ void chat_bot() {
.emit(emit -> emit.event(e -> e.type("org.acme.chatbot.finished"))))
.build();

try (WorkflowApplication app = WorkflowApplication.builder().build()) {
app.eventConsumer()
.register(
app.eventConsumer()
.listen(
new EventFilter()
.withWith(new EventProperties().withType("org.acme.chatbot.reply")),
app),
ce -> replyEvents.add((CloudEvent) ce));

app.eventConsumer()
.register(
app.eventConsumer()
.listen(
new EventFilter()
.withWith(new EventProperties().withType("org.acme.chatbot.finished")),
app),
ce -> finishedEvents.add((CloudEvent) ce));
InMemoryEvents eventBroker = new InMemoryEvents();
eventBroker.register("org.acme.chatbot.reply", ce -> replyEvents.add((CloudEvent) ce));
eventBroker.register("org.acme.chatbot.finished", ce -> finishedEvents.add((CloudEvent) ce));

try (WorkflowApplication app =
WorkflowApplication.builder()
.withEventConsumer(eventBroker)
.withEventPublisher(eventBroker)
.build()) {
final WorkflowInstance waitingInstance =
app.workflowDefinition(listenWorkflow).instance(Map.of());
final CompletableFuture<WorkflowModel> runningModel = waitingInstance.start();
Expand All @@ -130,12 +119,12 @@ void chat_bot() {
assertEquals(WorkflowStatus.WAITING, waitingInstance.status());

// Publish the events
app.eventPublisher().publish(newMessageEvent("Hello World!"));
eventBroker.publish(newMessageEvent("Hello World!"));
CloudEvent reply = replyEvents.poll(60, TimeUnit.SECONDS);
assertNotNull(reply);

// Empty message completes the workflow
app.eventPublisher().publish(newMessageEvent("", "org.acme.chatbot.finalize"));
eventBroker.publish(newMessageEvent("", "org.acme.chatbot.finalize"));
CloudEvent finished = finishedEvents.poll(60, TimeUnit.SECONDS);
assertNotNull(finished);
assertThat(finishedEvents).isEmpty();
Expand All @@ -145,6 +134,7 @@ void chat_bot() {

} catch (InterruptedException e) {
fail(e.getMessage());
} finally {
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.serverlessworkflow.impl.resources.StaticResource;
import io.serverlessworkflow.impl.schema.SchemaValidator;
import io.serverlessworkflow.impl.schema.SchemaValidatorFactory;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
Expand All @@ -58,7 +59,8 @@ public class WorkflowApplication implements AutoCloseable {
private final ExecutorServiceFactory executorFactory;
private final RuntimeDescriptorFactory runtimeDescriptorFactory;
private final EventConsumer<?, ?> eventConsumer;
private final EventPublisher eventPublisher;
private final Collection<EventPublisher> eventPublishers;
private final boolean lifeCycleCEPublishingEnabled;

private WorkflowApplication(Builder builder) {
this.taskFactory = builder.taskFactory;
Expand All @@ -72,7 +74,8 @@ private WorkflowApplication(Builder builder) {
this.listeners = builder.listeners != null ? builder.listeners : Collections.emptySet();
this.definitions = new ConcurrentHashMap<>();
this.eventConsumer = builder.eventConsumer;
this.eventPublisher = builder.eventPublisher;
this.eventPublishers = builder.eventPublishers;
this.lifeCycleCEPublishingEnabled = builder.lifeCycleCEPublishingEnabled;
}

public TaskExecutorFactory taskFactory() {
Expand All @@ -99,8 +102,8 @@ public Collection<WorkflowExecutionListener> listeners() {
return listeners;
}

public EventPublisher eventPublisher() {
return eventPublisher;
public Collection<EventPublisher> eventPublishers() {
return eventPublishers;
}

public WorkflowIdFactory idFactory() {
Expand Down Expand Up @@ -142,9 +145,10 @@ public SchemaValidator getValidator(SchemaInline inline) {
private WorkflowIdFactory idFactory = () -> UlidCreator.getMonotonicUlid().toString();
private ExecutorServiceFactory executorFactory = new DefaultExecutorServiceFactory();
private EventConsumer<?, ?> eventConsumer;
private EventPublisher eventPublisher;
private Collection<EventPublisher> eventPublishers = new ArrayList<>();
private RuntimeDescriptorFactory descriptorFactory =
() -> new RuntimeDescriptor("reference impl", "1.0.0_alpha", Collections.emptyMap());
private boolean lifeCycleCEPublishingEnabled = true;

private Builder() {}

Expand All @@ -168,6 +172,11 @@ public Builder withResourceLoaderFactory(ResourceLoaderFactory resourceLoader) {
return this;
}

public Builder disableLifeCycleCEPublishing() {
this.lifeCycleCEPublishingEnabled = false;
return this;
}

public Builder withExecutorFactory(ExecutorServiceFactory executorFactory) {
this.executorFactory = executorFactory;
return this;
Expand All @@ -193,10 +202,13 @@ public Builder withDescriptorFactory(RuntimeDescriptorFactory factory) {
return this;
}

public Builder withEventHandler(
EventPublisher eventPublisher, EventConsumer<?, ?> eventConsumer) {
public Builder withEventConsumer(EventConsumer<?, ?> eventConsumer) {
this.eventConsumer = eventConsumer;
this.eventPublisher = eventPublisher;
return this;
}

public Builder withEventPublisher(EventPublisher eventPublisher) {
this.eventPublishers.add(eventPublisher);
return this;
}

Expand All @@ -219,10 +231,19 @@ public WorkflowApplication build() {
.findFirst()
.orElseGet(() -> DefaultTaskExecutorFactory.get());
}
if (eventConsumer == null && eventPublisher == null) {
InMemoryEvents inMemory = new InMemoryEvents(executorFactory);
eventPublisher = inMemory;
eventConsumer = inMemory;
ServiceLoader.load(EventPublisher.class).forEach(e -> eventPublishers.add(e));
if (eventConsumer == null) {
eventConsumer =
ServiceLoader.load(EventConsumer.class)
.findFirst()
.orElseGet(
() -> {
InMemoryEvents inMemory = new InMemoryEvents(executorFactory);
if (eventPublishers.isEmpty()) {
eventPublishers.add(inMemory);
}
return inMemory;
});
}
return new WorkflowApplication(this);
}
Expand All @@ -242,8 +263,11 @@ public WorkflowDefinition workflowDefinition(Workflow workflow) {
@Override
public void close() {
safeClose(executorFactory);
safeClose(eventPublisher);
for (EventPublisher eventPublisher : eventPublishers) {
safeClose(eventPublisher);
}
safeClose(eventConsumer);

for (WorkflowDefinition definition : definitions.values()) {
safeClose(definition);
}
Expand Down Expand Up @@ -278,4 +302,8 @@ public EventConsumer eventConsumer() {
public ExecutorService executorService() {
return executorFactory.get();
}

public boolean isLifeCycleCEPublishingEnabled() {
return lifeCycleCEPublishingEnabled;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,10 @@ public boolean suspend() {
statusLock.lock();
if (TaskExecutorHelper.isActive(status.get())) {
suspended = new CompletableFuture<TaskContext>();
workflowContext.instance().status(WorkflowStatus.SUSPENDED);
publishEvent(
workflowContext,
l -> l.onWorkflowSuspended(new WorkflowSuspendedEvent(workflowContext)));
return true;
} else {
return false;
Expand All @@ -197,11 +201,11 @@ public boolean resume() {
publishEvent(
workflowContext,
l -> l.onTaskResumed(new TaskResumedEvent(workflowContext, suspendedTask)));
publishEvent(
workflowContext, l -> l.onWorkflowResumed(new WorkflowResumedEvent(workflowContext)));
} else {
suspended = null;
}
publishEvent(
workflowContext, l -> l.onWorkflowResumed(new WorkflowResumedEvent(workflowContext)));
return true;
} else {
return false;
Expand All @@ -216,12 +220,8 @@ public CompletableFuture<TaskContext> completedChecks(TaskContext t) {
statusLock.lock();
if (suspended != null) {
suspendedTask = t;
workflowContext.instance().status(WorkflowStatus.SUSPENDED);
publishEvent(
workflowContext, l -> l.onTaskSuspended(new TaskSuspendedEvent(workflowContext, t)));
publishEvent(
workflowContext,
l -> l.onWorkflowSuspended(new WorkflowSuspendedEvent(workflowContext)));
return suspended;
}
if (cancelled != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.serverlessworkflow.impl.events;

import io.cloudevents.CloudEvent;
import io.serverlessworkflow.impl.DefaultExecutorServiceFactory;
import io.serverlessworkflow.impl.ExecutorServiceFactory;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
Expand All @@ -29,6 +30,10 @@
*/
public class InMemoryEvents extends AbstractTypeConsumer implements EventPublisher {

public InMemoryEvents() {
this(new DefaultExecutorServiceFactory());
}

public InMemoryEvents(ExecutorServiceFactory serviceFactory) {
this.serviceFactory = serviceFactory;
}
Expand All @@ -40,7 +45,7 @@ public InMemoryEvents(ExecutorServiceFactory serviceFactory) {
private AtomicReference<Consumer<CloudEvent>> allConsumerRef = new AtomicReference<>();

@Override
protected void register(String topicName, Consumer<CloudEvent> consumer) {
public void register(String topicName, Consumer<CloudEvent> consumer) {
topicMap.put(topicName, consumer);
}

Expand Down Expand Up @@ -77,6 +82,7 @@ protected void unregisterFromAll() {

@Override
public void close() {
topicMap.clear();
serviceFactory.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,12 @@
import io.serverlessworkflow.impl.WorkflowUtils;
import io.serverlessworkflow.impl.WorkflowValueResolver;
import io.serverlessworkflow.impl.events.CloudEventUtils;
import io.serverlessworkflow.impl.events.EventPublisher;
import io.serverlessworkflow.impl.expressions.ExpressionDescriptor;
import io.serverlessworkflow.impl.resources.ResourceLoader;
import java.net.URI;
import java.time.OffsetDateTime;
import java.util.Collection;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -74,11 +76,13 @@ private EmitExecutor(EmitExecutorBuilder builder) {
@Override
protected CompletableFuture<WorkflowModel> internalExecute(
WorkflowContext workflow, TaskContext taskContext) {
return workflow
.definition()
.application()
.eventPublisher()
.publish(buildCloudEvent(workflow, taskContext))
Collection<EventPublisher> eventPublishers =
workflow.definition().application().eventPublishers();
CloudEvent ce = buildCloudEvent(workflow, taskContext);
return CompletableFuture.allOf(
eventPublishers.stream()
.map(eventPublisher -> eventPublisher.publish(ce))
.toArray(size -> new CompletableFuture[size]))
.thenApply(v -> taskContext.input());
}

Expand Down
Loading