diff --git a/docs/modules/ROOT/nav.adoc b/docs/modules/ROOT/nav.adoc
index ae0cebed1..c66c413c0 100644
--- a/docs/modules/ROOT/nav.adoc
+++ b/docs/modules/ROOT/nav.adoc
@@ -211,7 +211,10 @@
** Processing data
*** xref:pipelines:transforms.adoc[]
*** xref:pipelines:custom-aggregate-operation.adoc[]
+*** xref:pipelines:extensions.adoc[]
*** xref:pipelines:python.adoc[]
+*** xref:pipelines:map-transforms.adoc[]
+*** xref:pipelines:custom-extension.adoc[]
** Sending results to sinks
*** xref:pipelines:sending-data-to-sinks.adoc[Overview]
*** xref:pipelines:custom-sink.adoc[]
diff --git a/docs/modules/integrate/pages/map-connector.adoc b/docs/modules/integrate/pages/map-connector.adoc
index 028a65fed..992b31c8c 100644
--- a/docs/modules/integrate/pages/map-connector.adoc
+++ b/docs/modules/integrate/pages/map-connector.adoc
@@ -2,6 +2,7 @@
:description: A map is a distributed in-memory key-value data structure that can be used as a batch or streaming data source as well as a data sink.
A map is a distributed in-memory key-value data structure that can be used as a batch or streaming data source as well as a data sink. See xref:data-structures:map.adoc[].
+IMaps can also be used in pipeline transforms, see xref:pipelines:map-transforms.adoc[].
== Installing the Connector
diff --git a/docs/modules/pipelines/pages/custom-extension.adoc b/docs/modules/pipelines/pages/custom-extension.adoc
new file mode 100644
index 000000000..db49790e7
--- /dev/null
+++ b/docs/modules/pipelines/pages/custom-extension.adoc
@@ -0,0 +1,202 @@
+= Create a Custom Stage Extension
+
+It is possible to implement custom reusable xref:extensions.adoc[Pipeline API stage extension].
+This tutorial is based on `PythonExtension` available in Hazelcast and walks step-by-step
+how it is created.
+
+== Before you begin
+
+`PythonExtension` already exists, but in order to recreate it in steps according to this tutorial, you will need:
+
+1. A Java project with dependency to `hazelcast` and `hazelcast-jet-python` modules
+2. Python in version compatible with Jet (see xref:transforms.adoc#_mapusingpython[mapUsingPython requirements])
+3. A Python script with function to invoke
+
+You can reuse setup from xref:python.adoc#_before_you_begin[Apply a Custom Transform with Python] tutorial.
+
+== Step 1. Define supported stage types
+
+First step is to decide to what stages the extension will be applicable.
+It is done by implementing `StageExtension` interface from appropriate Stage class.
+Most commonly `StreamStage` and `BatchStage` will be used and often it is relatively easy
+to implement extension for both kinds of stages and it makes it more flexible.
+In some cases you may want to also support `StreamStageWithKey` and `BatchStageWithKey`.
+
+[source,java]
+----
+public interface PythonExtension extends
+ StreamStage.StageExtension,
+ BatchStage.StageExtension {
+}
+----
+
+The Python extension supports both stream and batch stage.
+Python transformation works only with `String`.
+In more general cases this should be a generic type parameter.
+
+== Step 2. Define API for the extension
+
+Next you need to define the fluent API that will be available thanks to the extension.
+It is recommended to use interfaces to clearly separate API from implementation.
+
+Usually the extension will ultimately return to the original `StreamStage` and `BatchStage` API,
+but possibly with changed item type eg. due to mapping performed via extension.
+
+[source,java]
+----
+public interface PythonExtension extends
+ StreamStage.StageExtension>>,
+ BatchStage.StageExtension>> { // (1)
+
+ interface PythonStage> { // (2)
+ S map(PythonServiceConfig cfg); // (3)
+ }
+}
+----
+
+Initially the example Python extension will support only single `map` method *(3)*.
+In order to handle both `StreamStage` and `BatchStage` in type-safe way we use generic `S` parameter *(2)*
+so that `map` method returns correct stage type (stream or batch) for further chaining.
+
+`StageExtension` needs to know the type that will be used as stage API for extensions, so it is configured in *(1)*.
+
+== Step 3. Define entry point for the extension
+
+Static parameterless method *(1)* is a recommended way to provide entry point to the extension for `stage.using()` invocation
+as it can be statically imported and produces a readable, fluent syntax.
+
+Note that the method provides implementations for different stage types *(2)* - Java compiler will choose appropriate variant automatically.
+If there are generic types (e.g. stream item type), the compiler will infer them properly avoiding the need to specify them explicitly.
+
+[source,java]
+----
+public interface PythonExtension extends
+ StreamStage.StageExtension>>,
+ BatchStage.StageExtension>> {
+
+ static PythonExtension python() { // (1)
+ return new PythonExtensionImpl(); // (2)
+ }
+}
+
+final class PythonExtensionImpl implements PythonExtension {
+}
+----
+
+== Step 4. Implement the extension
+
+There are two pieces left to be implemented: the extension class (`PythonExtensionImpl`) and the custom stage (`PythonStage`).
+
+[source,java]
+----
+final class PythonExtensionImpl implements PythonExtension {
+
+ @Override
+ public PythonStage> extend(StreamStage streamStage) { // (1)
+ return new GeneralPythonStage<>(streamStage);
+ }
+
+ @Override
+ public PythonStage> extend(BatchStage batchStage) { // (2)
+ return new GeneralPythonStage<>(batchStage);
+ }
+}
+
+static class GeneralPythonStage> implements PythonStage {
+ private final S stage;
+
+ GeneralPythonStage(S stage) {
+ this.stage = stage; // (3)
+ }
+
+ @Override
+ public S map(PythonServiceConfig cfg) {
+ return (S) stage.mapUsingServiceAsyncBatched( // (4)
+ PythonService.factory(cfg), // (5)
+ PythonTransforms.DEFAULT_MAX_BATCH_SIZE,
+ PythonService::sendRequest)
+ .setName("mapUsingPython"); // (6)
+ }
+}
+----
+
+Extension class is just an implementation of a visitor pattern *(1)* *(2)* that creates appropriate custom stage implementation.
+It gets a reference to current stage in the pipeline which can be used to implement the extensions logic using existing infrastructure,
+for example `mapUsingService`.
+
+The custom stage remembers the `stage` *(3)* so it can be used to implement the `map` method.
+`map` methods invokes standard `mapUsingServiceAsyncBatched` method *(4)* wiring extension-specific logic (service factory, mapping method) *(5)*.
+We also provide a default stage name *(6)*.
+The created stage is returned, so it is possible to use standard `Stage` methods like `setName` or `setLocalParallelism` to customize the just-created stage.
+
+The result must be cast to `S` because in this context `mapUsingServiceAsyncBatched` returns `GeneralStage` *(4)*.
+In case of the Python extension, functions will always return the same stage type as it was before applying the extension as `mapUsingServiceAsyncBatched` does not change `StreamStage` to `BatchStage`; item type will still be `String` too.
+In general however, especially if the item type changes, you may need to handle `StreamStage` and `BatchStage` explicitly.
+You can see how this can be implemented for example in `mapUsingIMap`:
+
+1. `GeneralStage` provides base method definitions, javadoc and some shared default implementations. Methods return `GeneralStage`
+2. `StreamStage` and `BatchStage` override the methods to return correct stage type, invoke base method and cast the result
+
+== Step 5. Test the extension
+
+Extensions can be tested by using them in a test pipeline.
+
+[source,java]
+----
+import static com.hazelcast.jet.python.PythonExtension.python;
+
+public void streamStage_mapUsingPython_extension() {
+ // Given
+ PythonServiceConfig cfg = new PythonServiceConfig()
+ .setBaseDir(baseDir.toString())
+ .setHandlerModule("echo")
+ .setHandlerFunction("handle");
+ List items = IntStream.range(0, ITEM_COUNT).mapToObj(Integer::toString).collect(toList());
+ Pipeline p = Pipeline.create();
+ var stage = p.readFrom(TestSources.items(items)).addTimestamps(x -> 0, 0);
+
+ // When
+ var mapped = stage.using(python())
+ .map(cfg) // (1)
+ .setName("python-echo") // (2)
+ .setLocalParallelism(2);
+
+ // Then
+ mapped.writeTo(AssertionSinks.assertAnyOrder(
+ "Python didn't map the items correctly", items.stream().map(i -> "echo-" + i).collect(toList())
+ ));
+ instance().getJet().newJob(p).join();
+}
+----
+
+The extension invocation *(1)* is very simple: `stage.using(python()).map(cfg)`.
+
+`map` returns one of the standard `Stage` interfaces, so you can customize it in the same way as with standard transforms *(2)*, by giving it a more specific name (`setName`) or configuring parallelism (`setLocalParallelism`).
+
+== Step 6. Add more capabilities
+
+Once the basic structure works, you can add more methods and capabilities.
+You can leverage the fact that you control the return type, so any fluent API can be implemented as long as you ultimately return to one of the standard stages.
+
+Python extension for example has a fluent builder API to prepare `PythonServiceConfig` on the fly:
+
+[source,java]
+----
+StreamStage sourceStage;
+var mapped = sourceStage.using(python())
+ .baseDir("/tmp")
+ .handlerModule("echo")
+ .handlerFunction("handle")
+ .maxBatchSize(1)
+ .map().setName("python-echo").setLocalParallelism(2);
+----
+
+== Summary
+
+In this tutorial, you learned how to implement and use custom stage extension in Jet Pipeline API.
+
+== Next steps
+
+1. Review the `PythonExtension` production-ready https://github.com/hazelcast/hazelcast/blob/master/extensions/python/src/main/java/com/hazelcast/jet/python/PythonExtension.java[API] and https://github.com/hazelcast/hazelcast/blob/master/extensions/python/src/main/java/com/hazelcast/jet/python/PythonExtensionImpl.java[implementation] in the Hazelcast repository
+2. Check other xref:extensions.adoc#_builtin_extensions[available extensions]
+3. Implement your own extension to simplify your pipeline, wrap a complex service or reuse some logic
diff --git a/docs/modules/pipelines/pages/extensions.adoc b/docs/modules/pipelines/pages/extensions.adoc
new file mode 100644
index 000000000..d92bf9164
--- /dev/null
+++ b/docs/modules/pipelines/pages/extensions.adoc
@@ -0,0 +1,30 @@
+= Stage Extensions
+
+Stage extensions provide a way to add more capabilities to Pipeline API.
+They maintain the familiar and convenient fluent API for defining the pipeline.
+Extensions are applied in the pipeline by xref:transforms.adoc#_using[`using` transform].
+
+[#_builtin_extensions]
+== Builtin extensions
+
+There are some extensions available in the Hazelcast distribution.
+
+.Extensions
+|===
+|Extension | Module | Batch/Streaming | Description
+
+|xref:python.adoc[PythonExtension]
+|`hazelcast-jet-python`
+|batch, streaming
+|Fluent API for Python invocation
+
+|xref:pipelines:map-transforms.adoc#_map_transforms[IMapExtension] ([.enterprise]*{enterprise-product-name}*)
+|`hazelcast-enterprise`
+|batch, streaming
+|Convenience methods for using IMaps in pipeline transformations
+
+|===
+
+== Custom extensions
+
+It is possible to implement custom extensions. For a tutorial see xref:custom-extension.adoc[].
\ No newline at end of file
diff --git a/docs/modules/pipelines/pages/map-transforms.adoc b/docs/modules/pipelines/pages/map-transforms.adoc
new file mode 100644
index 000000000..51365cdf0
--- /dev/null
+++ b/docs/modules/pipelines/pages/map-transforms.adoc
@@ -0,0 +1,84 @@
+= Map Transforms
+
+You can use IMap to perform transformations of the items in the pipeline, for example to lookup dictionary values or enrich the data.
+The simplest xref:pipelines:transforms.adoc#_mapusingimap[`mapUsingIMap` transformation] is available in the stage API itself.
+
+[#_map_transforms]
+== IMapExtension ([.enterprise]*{enterprise-product-name}*)
+
+More advanced transformations are provided in `IMapExtension`:
+
+- `mapUsingPutIfAbsent`: performs `putIfAbsent` and provides previous value to custom mapping function
+
+All these transformations provide `AT_LEAST_ONCE` processing guarantees.
+They are not transactional, but rely on sources to retry events in case of failure.
+
+It is possible to customize some parameters of the transformation:
+
+- `maxConcurrentOps` - sets maximum number of concurrent async operations
+- `doNotPreserveOrder` - disables preservation of ordering of the input items
+
+These parameters must be configured before `mapUsingPutIfAbsent` is invoked.
+Some standard stage parameters can also be configured for the created stage:
+
+- `setName` - more meaningful stage name
+- `setLocalParallelism` - local parallelism of the stage
+
+
+The following example records first occurrence of a key from given bucket
+and detects subsequent occurrences allowing to react differently to them.
+
+[source,java]
+----
+import static com.hazelcast.jet.pipeline.IMapExtension.iMapExtension;
+
+Pipeline p = Pipeline.create();
+p.readFrom(TestSources.items(1, 2, 3, 11, 12, 13, 21, 22, 23))
+ // rebalance is used to guarantee order of items mapping to the same IMap key
+ .rebalance(e -> e % 10)
+ .using(iMapExtension())
+ .mapUsingPutIfAbsent("some-map",
+ e -> e % 10,
+ e -> e,
+ Tuple3::tuple3)
+ .writeTo(Sinks.logger());
+----
+
+It should produce output similar as below. The tuple contains in order: item, previous IMap value, value attempted to be inserted in `putIfAbsent`.
+Notice that some entries appear in different order due to parallelism, but the invariant is preserved - only first occurrence updates the IMap.
+
+[source]
+----
+[127.0.0.1]:5702 [dev] [0ee0-e20c-9700-0001/loggerSink#0] (2, null, 2)
+[127.0.0.1]:5702 [dev] [0ee0-e20c-9700-0001/loggerSink#0] (3, null, 3)
+[127.0.0.1]:5702 [dev] [0ee0-e20c-9700-0001/loggerSink#0] (12, 2, 12)
+[127.0.0.1]:5702 [dev] [0ee0-e20c-9700-0001/loggerSink#0] (13, 3, 13)
+[127.0.0.1]:5702 [dev] [0ee0-e20c-9700-0001/loggerSink#0] (22, 2, 22)
+[127.0.0.1]:5702 [dev] [0ee0-e20c-9700-0001/loggerSink#0] (23, 3, 23)
+[127.0.0.1]:5702 [dev] [0ee0-e20c-9700-0001/loggerSink#0] (1, null, 1)
+[127.0.0.1]:5702 [dev] [0ee0-e20c-9700-0001/loggerSink#0] (11, 1, 11)
+[127.0.0.1]:5702 [dev] [0ee0-e20c-9700-0001/loggerSink#0] (21, 1, 21)
+
+Final IMap contents: [1=1, 2=2, 3=3]
+----
+
+The following example demonstrates how to use all available settings:
+
+[source,java]
+----
+import static com.hazelcast.jet.pipeline.IMapExtension.iMapExtension;
+
+Pipeline p = Pipeline.create();
+p.readFrom(TestSources.items(1, 2, 3, 11, 12, 13, 21, 22, 23))
+ .rebalance(e -> e % 10)
+ .using(iMapExtension())
+ .doNotPreserveOrder()
+ .maxConcurrentOps(32)
+ .mapUsingPutIfAbsent("some-map",
+ e -> e % 10,
+ e -> e,
+ Tuple3::tuple3)
+ .setName("store-first-by-bucket")
+ .setLocalParallelism(8)
+ .writeTo(Sinks.logger());
+----
diff --git a/docs/modules/pipelines/pages/python.adoc b/docs/modules/pipelines/pages/python.adoc
index 81b72e529..7d0a1b0ad 100644
--- a/docs/modules/pipelines/pages/python.adoc
+++ b/docs/modules/pipelines/pages/python.adoc
@@ -100,9 +100,49 @@ We'll assume you're using an IDE. Create a blank Java project named
== Step 3. Apply the Python Function to a Pipeline
This code generates a stream of numbers and lets Python take their
-square roots. Make sure to set the right path in the `.setBaseDir` line:
+square roots. Make sure to set the right path in the `.baseDir` line:
-```java
+[tabs]
+====
+New API (`PythonExtension`)::
++
+--
+[source,java]
+----
+package org.example;
+
+import com.hazelcast.core.Hazelcast;
+import com.hazelcast.core.HazelcastInstance;
+import com.hazelcast.jet.config.JobConfig;
+import com.hazelcast.jet.pipeline.*;
+import com.hazelcast.jet.pipeline.test.TestSources;
+import com.hazelcast.jet.python.PythonServiceConfig;
+
+import static com.hazelcast.jet.python.PythonExtension.python;
+
+public class JetJob {
+ public static void main(String[] args) {
+ Pipeline pipeline = Pipeline.create();
+ pipeline.readFrom(TestSources.itemStream(10, (ts, seq) -> String.valueOf(seq)))
+ .withoutTimestamps()
+ .using(python())
+ .baseDir("")
+ .handlerModule("take_sqrt")
+ .map()
+ .setLocalParallelism(1)
+ .writeTo(Sinks.logger());
+
+ JobConfig cfg = new JobConfig().setName("python-function");
+ HazelcastInstance hz = Hazelcast.bootstrappedInstance();
+ hz.getJet().newJob(pipeline, cfg);
+ }
+}
+----
+--
+Old API (`PythonTransforms`)::
++
+[source,java]
+----
package org.example;
import com.hazelcast.core.Hazelcast;
@@ -130,7 +170,8 @@ public class JetJob {
hz.getJet().newJob(pipeline, cfg);
}
}
-```
+----
+====
You may run this code from your IDE and it will work, but it will create
its own Hazelcast member. `bin/hz-cli` directory is in the distribution which is downloaded before. To run it on the
diff --git a/docs/modules/pipelines/pages/transforms.adoc b/docs/modules/pipelines/pages/transforms.adoc
index 915b8ae9f..f10231147 100644
--- a/docs/modules/pipelines/pages/transforms.adoc
+++ b/docs/modules/pipelines/pages/transforms.adoc
@@ -67,6 +67,7 @@ StreamStage tradesTokyo = pipeline
StreamStage tradesNyAndTokyo = tradesNewYork.merge(tradesTokyo);
```
+[#_mapusingimap]
=== mapUsingIMap
This transform looks up each incoming item from the corresponding
@@ -299,11 +300,27 @@ If you have some simple Python work that fits into a single file, you
can tell Hazelcast just the name of that file, which is assumed to be a Python
module file that declares `transform_list`:
-```java
+[tabs]
+====
+New API (`PythonExtension`)::
++
+--
+[source,java]
+----
+StreamStage sourceStage = sourceStage();
+StreamStage pythonMapped = sourceStage.using(PythonExtension.python()).map(
+ new PythonServiceConfig().setHandlerFile("path/to/handler.py"));
+----
+--
+Old API (`PythonTransforms`)::
++
+[source,java]
+----
StreamStage sourceStage = sourceStage();
StreamStage pythonMapped = sourceStage.apply(PythonTransforms.mapUsingPython(
new PythonServiceConfig().setHandlerFile("path/to/handler.py")));
-```
+----
+====
And here's an example of `handler.py`:
@@ -317,13 +334,30 @@ name its base directory and Hazelcast will upload all it (recursively) to
the cluster as a part of the submitted job. In this case you must also
name the Python module that declares `transform_list`:
-```java
+
+[tabs]
+====
+New API (`PythonExtension`)::
++
+--
+[source,java]
+----
+StreamStage sourceStage = sourceStage();
+StreamStage pythonMapped = sourceStage.using(PythonExtension.python()).map(
+ new PythonServiceConfig().setBaseDir("path/to/python_project")
+ .setHandlerModule("jet_handler"));
+----
+--
+Old API (`PythonTransforms`)::
++
+[source,java]
+----
StreamStage sourceStage = sourceStage();
StreamStage pythonMapped = sourceStage.apply(PythonTransforms.mapUsingPython(
new PythonServiceConfig().setBaseDir("path/to/python_project")
.setHandlerModule("jet_handler"))
-);
-```
+----
+====
Normally your Python code will make use of non-standard libraries. Hazelcast
recognizes the conventional `requirements.txt` file in your project's
@@ -1069,7 +1103,76 @@ side-effects: it logs every item it receives. Since the logging happens
on the machine that is processing the item, this transform is primarily
intended to be used during development.
-== customTransform
+== Custom transforms
+
+Pipeline API cannot cover all possible use cases, but it gives a few ways in which custom transformations can be added to the pipeline.
+They differ in their main intended purpose:
+
+- `apply` for reuse of common pipeline fragments or simple logic
+- `using` for more comprehensive fluent API for integration with other services or complex logic
+- `customTransform` for a low-level use of Jet features
+
+=== apply
+
+`stage.apply` allows you to extract common pipeline transformations into a reusable method
+and then use that method without interrupting the chained pipeline expression.
+
+For example, say you have this code:
+
+[source,java]
+----
+StreamStage input = pipeline.readFrom(textSource);
+StreamStage cleanedUp = input
+ .map(String::toLowerCase)
+ .filter(s -> s.startsWith("success"));
+----
+
+You can capture the map and filter steps into a common "cleanup" transformation:
+
+[source,java]
+----
+StreamStage cleanUp(StreamStage input) {
+ return input.map(String::toLowerCase)
+ .filter(s -> s.startsWith("success"));
+}
+----
+
+NOTE: When defining the custom transformation you must decide if it should be applicable to `StreamStage` or `BatchStage`.
+If you want both, you need to create two methods.
+
+Now you can insert this transformation as just another step in your pipeline:
+
+[source,java]
+----
+StreamStage tokens = pipeline
+ .readFrom(textSource)
+ .apply(this::cleanUp)
+ .flatMap(line -> traverseArray(line.split("\\W+")));
+----
+
+[#_using]
+=== using
+
+`stage.using` enables use of xref:extensions.adoc[stage extension] that provides additional capabilities to the pipeline
+remaining as a convenient fluent API chained expression.
+
+[source,java]
+----
+p.readFrom(TestSources.items(1, 2, 3, 11, 12, 13))
+ // rebalance is used to guarantee order of items mapping to the same IMap key
+ .rebalance(e -> e % 10)
+ .using(iMapExtension())
+ .mapUsingPutIfAbsent("some-map",
+ e -> e % 10,
+ e -> e,
+ Tuple3::tuple3)
+ .writeTo(Sinks.logger());
+----
+
+There are some extensions available in the Hazelcast distribution, for a complete list see xref:extensions.adoc#_builtin_extensions[Builtin extensions].
+It is also possible to implement custom extensions. For a tutorial see xref:custom-extension.adoc[].
+
+=== customTransform
All the data processing in a pipeline happens inside the
implementations of the `Processor` interface, a central part of the Jet API. With `stage.customTransform` you can provide your own