Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/modules/ROOT/nav.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,10 @@
** Processing data
*** xref:pipelines:transforms.adoc[]
*** xref:pipelines:custom-aggregate-operation.adoc[]
*** xref:pipelines:extensions.adoc[]
*** xref:pipelines:python.adoc[]
*** xref:pipelines:map-transforms.adoc[]
*** xref:pipelines:custom-extension.adoc[]
** Sending results to sinks
*** xref:pipelines:sending-data-to-sinks.adoc[Overview]
*** xref:pipelines:custom-sink.adoc[]
Expand Down
1 change: 1 addition & 0 deletions docs/modules/integrate/pages/map-connector.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
:description: A map is a distributed in-memory key-value data structure that can be used as a batch or streaming data source as well as a data sink.

A map is a distributed in-memory key-value data structure that can be used as a batch or streaming data source as well as a data sink. See xref:data-structures:map.adoc[].
IMaps can also be used in pipeline transforms, see xref:pipelines:map-transforms.adoc[].

== Installing the Connector

Expand Down
202 changes: 202 additions & 0 deletions docs/modules/pipelines/pages/custom-extension.adoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
= Create a Custom Stage Extension

It is possible to implement custom reusable xref:extensions.adoc[Pipeline API stage extension].
This tutorial is based on `PythonExtension` available in Hazelcast and walks step-by-step
how it is created.

== Before you begin

`PythonExtension` already exists, but in order to recreate it in steps according to this tutorial, you will need:

1. A Java project with dependency to `hazelcast` and `hazelcast-jet-python` modules
2. Python in version compatible with Jet (see xref:transforms.adoc#_mapusingpython[mapUsingPython requirements])
3. A Python script with function to invoke

You can reuse setup from xref:python.adoc#_before_you_begin[Apply a Custom Transform with Python] tutorial.

== Step 1. Define supported stage types
Comment thread
Rob-Hazelcast marked this conversation as resolved.

First step is to decide to what stages the extension will be applicable.
It is done by implementing `StageExtension` interface from appropriate Stage class.
Most commonly `StreamStage` and `BatchStage` will be used and often it is relatively easy
to implement extension for both kinds of stages and it makes it more flexible.
In some cases you may want to also support `StreamStageWithKey` and `BatchStageWithKey`.

[source,java]
----
public interface PythonExtension extends
StreamStage.StageExtension<String, /*...*/>,
BatchStage.StageExtension<String, /*...*/> {
}
----

The Python extension supports both stream and batch stage.
Python transformation works only with `String`.
In more general cases this should be a generic type parameter.

== Step 2. Define API for the extension

Next you need to define the fluent API that will be available thanks to the extension.
It is recommended to use interfaces to clearly separate API from implementation.

Usually the extension will ultimately return to the original `StreamStage` and `BatchStage` API,
but possibly with changed item type eg. due to mapping performed via extension.

[source,java]
----
public interface PythonExtension extends
StreamStage.StageExtension<String, PythonExtension.PythonStage<StreamStage<String>>>,
BatchStage.StageExtension<String, PythonExtension.PythonStage<BatchStage<String>>> { // (1)

interface PythonStage<S extends GeneralStage<String>> { // (2)
S map(PythonServiceConfig cfg); // (3)
}
}
----

Initially the example Python extension will support only single `map` method *(3)*.
In order to handle both `StreamStage` and `BatchStage` in type-safe way we use generic `S` parameter *(2)*
so that `map` method returns correct stage type (stream or batch) for further chaining.

`StageExtension` needs to know the type that will be used as stage API for extensions, so it is configured in *(1)*.

== Step 3. Define entry point for the extension

Static parameterless method *(1)* is a recommended way to provide entry point to the extension for `stage.using()` invocation
as it can be statically imported and produces a readable, fluent syntax.

Note that the method provides implementations for different stage types *(2)* - Java compiler will choose appropriate variant automatically.
If there are generic types (e.g. stream item type), the compiler will infer them properly avoiding the need to specify them explicitly.

[source,java]
----
public interface PythonExtension extends
StreamStage.StageExtension<String, PythonExtension.PythonStage<StreamStage<String>>>,
BatchStage.StageExtension<String, PythonExtension.PythonStage<BatchStage<String>>> {

static PythonExtension python() { // (1)
Comment thread
TomaszGaweda marked this conversation as resolved.
return new PythonExtensionImpl(); // (2)
}
}

final class PythonExtensionImpl implements PythonExtension {
}
----

== Step 4. Implement the extension

There are two pieces left to be implemented: the extension class (`PythonExtensionImpl`) and the custom stage (`PythonStage`).

[source,java]
----
final class PythonExtensionImpl implements PythonExtension {

@Override
public PythonStage<StreamStage<String>> extend(StreamStage<String> streamStage) { // (1)
return new GeneralPythonStage<>(streamStage);
}

@Override
public PythonStage<BatchStage<String>> extend(BatchStage<String> batchStage) { // (2)
return new GeneralPythonStage<>(batchStage);
}
}

static class GeneralPythonStage<S extends GeneralStage<String>> implements PythonStage<S> {
private final S stage;

GeneralPythonStage(S stage) {
this.stage = stage; // (3)
}

@Override
public S map(PythonServiceConfig cfg) {
return (S) stage.mapUsingServiceAsyncBatched( // (4)
PythonService.factory(cfg), // (5)
PythonTransforms.DEFAULT_MAX_BATCH_SIZE,
PythonService::sendRequest)
.setName("mapUsingPython"); // (6)
}
}
----

Extension class is just an implementation of a visitor pattern *(1)* *(2)* that creates appropriate custom stage implementation.
It gets a reference to current stage in the pipeline which can be used to implement the extensions logic using existing infrastructure,
for example `mapUsingService`.

The custom stage remembers the `stage` *(3)* so it can be used to implement the `map` method.
`map` methods invokes standard `mapUsingServiceAsyncBatched` method *(4)* wiring extension-specific logic (service factory, mapping method) *(5)*.
We also provide a default stage name *(6)*.
The created stage is returned, so it is possible to use standard `Stage` methods like `setName` or `setLocalParallelism` to customize the just-created stage.

The result must be cast to `S` because in this context `mapUsingServiceAsyncBatched` returns `GeneralStage` *(4)*.
Comment thread
k-jamroz marked this conversation as resolved.
In case of the Python extension, functions will always return the same stage type as it was before applying the extension as `mapUsingServiceAsyncBatched` does not change `StreamStage` to `BatchStage`; item type will still be `String` too.
In general however, especially if the item type changes, you may need to handle `StreamStage` and `BatchStage` explicitly.
Comment thread
Rob-Hazelcast marked this conversation as resolved.
You can see how this can be implemented for example in `mapUsingIMap`:

1. `GeneralStage` provides base method definitions, javadoc and some shared default implementations. Methods return `GeneralStage`
2. `StreamStage` and `BatchStage` override the methods to return correct stage type, invoke base method and cast the result

== Step 5. Test the extension

Extensions can be tested by using them in a test pipeline.

[source,java]
----
import static com.hazelcast.jet.python.PythonExtension.python;

public void streamStage_mapUsingPython_extension() {
// Given
PythonServiceConfig cfg = new PythonServiceConfig()
.setBaseDir(baseDir.toString())
.setHandlerModule("echo")
.setHandlerFunction("handle");
List<String> items = IntStream.range(0, ITEM_COUNT).mapToObj(Integer::toString).collect(toList());
Pipeline p = Pipeline.create();
var stage = p.readFrom(TestSources.items(items)).addTimestamps(x -> 0, 0);

// When
var mapped = stage.using(python())
.map(cfg) // (1)
.setName("python-echo") // (2)
.setLocalParallelism(2);

// Then
mapped.writeTo(AssertionSinks.assertAnyOrder(
"Python didn't map the items correctly", items.stream().map(i -> "echo-" + i).collect(toList())
));
instance().getJet().newJob(p).join();
}
----

The extension invocation *(1)* is very simple: `stage.using(python()).map(cfg)`.

`map` returns one of the standard `Stage` interfaces, so you can customize it in the same way as with standard transforms *(2)*, by giving it a more specific name (`setName`) or configuring parallelism (`setLocalParallelism`).

== Step 6. Add more capabilities

Once the basic structure works, you can add more methods and capabilities.
You can leverage the fact that you control the return type, so any fluent API can be implemented as long as you ultimately return to one of the standard stages.

Python extension for example has a fluent builder API to prepare `PythonServiceConfig` on the fly:

[source,java]
----
StreamStage<String> sourceStage;
var mapped = sourceStage.using(python())
.baseDir("/tmp")
.handlerModule("echo")
.handlerFunction("handle")
.maxBatchSize(1)
.map().setName("python-echo").setLocalParallelism(2);
----

== Summary

In this tutorial, you learned how to implement and use custom stage extension in Jet Pipeline API.

== Next steps

1. Review the `PythonExtension` production-ready https://github.com/hazelcast/hazelcast/blob/master/extensions/python/src/main/java/com/hazelcast/jet/python/PythonExtension.java[API] and https://github.com/hazelcast/hazelcast/blob/master/extensions/python/src/main/java/com/hazelcast/jet/python/PythonExtensionImpl.java[implementation] in the Hazelcast repository
2. Check other xref:extensions.adoc#_builtin_extensions[available extensions]
3. Implement your own extension to simplify your pipeline, wrap a complex service or reuse some logic
30 changes: 30 additions & 0 deletions docs/modules/pipelines/pages/extensions.adoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
= Stage Extensions

Stage extensions provide a way to add more capabilities to Pipeline API.
They maintain the familiar and convenient fluent API for defining the pipeline.
Extensions are applied in the pipeline by xref:transforms.adoc#_using[`using` transform].

[#_builtin_extensions]
== Builtin extensions

There are some extensions available in the Hazelcast distribution.

.Extensions
|===
|Extension | Module | Batch/Streaming | Description

|xref:python.adoc[PythonExtension]
|`hazelcast-jet-python`
|batch, streaming
|Fluent API for Python invocation

|xref:pipelines:map-transforms.adoc#_map_transforms[IMapExtension] ([.enterprise]*{enterprise-product-name}*)
|`hazelcast-enterprise`
|batch, streaming
|Convenience methods for using IMaps in pipeline transformations

|===

== Custom extensions

It is possible to implement custom extensions. For a tutorial see xref:custom-extension.adoc[].
84 changes: 84 additions & 0 deletions docs/modules/pipelines/pages/map-transforms.adoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
= Map Transforms

You can use IMap to perform transformations of the items in the pipeline, for example to lookup dictionary values or enrich the data.
The simplest xref:pipelines:transforms.adoc#_mapusingimap[`mapUsingIMap` transformation] is available in the stage API itself.

[#_map_transforms]
== IMapExtension ([.enterprise]*{enterprise-product-name}*)

More advanced transformations are provided in `IMapExtension`:

- `mapUsingPutIfAbsent`: performs `putIfAbsent` and provides previous value to custom mapping function

All these transformations provide `AT_LEAST_ONCE` processing guarantees.
They are not transactional, but rely on sources to retry events in case of failure.

It is possible to customize some parameters of the transformation:

- `maxConcurrentOps` - sets maximum number of concurrent async operations
- `doNotPreserveOrder` - disables preservation of ordering of the input items

These parameters must be configured before `mapUsingPutIfAbsent` is invoked.
Some standard stage parameters can also be configured for the created stage:

- `setName` - more meaningful stage name
- `setLocalParallelism` - local parallelism of the stage


The following example records first occurrence of a key from given bucket
and detects subsequent occurrences allowing to react differently to them.

[source,java]
----
import static com.hazelcast.jet.pipeline.IMapExtension.iMapExtension;

Pipeline p = Pipeline.create();
p.readFrom(TestSources.items(1, 2, 3, 11, 12, 13, 21, 22, 23))
// rebalance is used to guarantee order of items mapping to the same IMap key
.rebalance(e -> e % 10)
.using(iMapExtension())
.mapUsingPutIfAbsent("some-map",
e -> e % 10,
e -> e,
Tuple3::tuple3)
.writeTo(Sinks.logger());
----

It should produce output similar as below. The tuple contains in order: item, previous IMap value, value attempted to be inserted in `putIfAbsent`.
Notice that some entries appear in different order due to parallelism, but the invariant is preserved - only first occurrence updates the IMap.

[source]
----
[127.0.0.1]:5702 [dev] [0ee0-e20c-9700-0001/loggerSink#0] (2, null, 2)
[127.0.0.1]:5702 [dev] [0ee0-e20c-9700-0001/loggerSink#0] (3, null, 3)
[127.0.0.1]:5702 [dev] [0ee0-e20c-9700-0001/loggerSink#0] (12, 2, 12)
[127.0.0.1]:5702 [dev] [0ee0-e20c-9700-0001/loggerSink#0] (13, 3, 13)
[127.0.0.1]:5702 [dev] [0ee0-e20c-9700-0001/loggerSink#0] (22, 2, 22)
[127.0.0.1]:5702 [dev] [0ee0-e20c-9700-0001/loggerSink#0] (23, 3, 23)
[127.0.0.1]:5702 [dev] [0ee0-e20c-9700-0001/loggerSink#0] (1, null, 1)
[127.0.0.1]:5702 [dev] [0ee0-e20c-9700-0001/loggerSink#0] (11, 1, 11)
[127.0.0.1]:5702 [dev] [0ee0-e20c-9700-0001/loggerSink#0] (21, 1, 21)

Final IMap contents: [1=1, 2=2, 3=3]
----

The following example demonstrates how to use all available settings:

[source,java]
----
import static com.hazelcast.jet.pipeline.IMapExtension.iMapExtension;

Pipeline p = Pipeline.create();
p.readFrom(TestSources.items(1, 2, 3, 11, 12, 13, 21, 22, 23))
.rebalance(e -> e % 10)
.using(iMapExtension())
.doNotPreserveOrder()
.maxConcurrentOps(32)
.mapUsingPutIfAbsent("some-map",
e -> e % 10,
e -> e,
Tuple3::tuple3)
.setName("store-first-by-bucket")
.setLocalParallelism(8)
.writeTo(Sinks.logger());
----
Loading
Loading