Skip to content

Commit 0022ff1

Browse files
authored
[FLINK-38686][doc] Add model table api documentation (apache#27243)
1 parent cd219d1 commit 0022ff1

File tree

2 files changed

+217
-0
lines changed

2 files changed

+217
-0
lines changed

docs/content.zh/docs/dev/table/tableApi.md

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2735,6 +2735,114 @@ result = t.select(col('a'), col('c')) \
27352735

27362736
{{< query_state_warning_zh >}}
27372737

2738+
### 模型推理
2739+
2740+
{{< label Streaming >}}
2741+
2742+
Table API 支持模型推理操作,允许你将机器学习模型直接集成到数据处理管道中。你可以使用特定的提供者创建模型,并使用它们对数据进行推理。
2743+
2744+
#### 创建和使用模型
2745+
2746+
使用 `ModelDescriptor` 创建模型,它指定提供者、输入/输出 schema 以及配置选项。创建后,你可以使用该模型对表进行预测。
2747+
2748+
{{< tabs "model-inference" >}}
2749+
{{< tab "Java" >}}
2750+
2751+
```java
2752+
// 1. 设置本地环境
2753+
EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();
2754+
TableEnvironment tEnv = TableEnvironment.create(settings);
2755+
2756+
// 2. 从内存数据创建源表
2757+
Table myTable = tEnv.fromValues(
2758+
ROW(FIELD("text", STRING())),
2759+
row("Hello"),
2760+
row("Machine Learning"),
2761+
row("Good morning")
2762+
);
2763+
2764+
// 3. 创建模型
2765+
tEnv.createModel(
2766+
"my_model",
2767+
ModelDescriptor.forProvider("openai")
2768+
.inputSchema(Schema.newBuilder().column("input", STRING()).build())
2769+
.outputSchema(Schema.newBuilder().column("output", STRING()).build())
2770+
.option("endpoint", "https://api.openai.com/v1/chat/completions")
2771+
.option("model", "gpt-4.1")
2772+
.option("system-prompt", "translate to chinese")
2773+
.option("api-key", "<your-openai-api-key-here>")
2774+
.build()
2775+
);
2776+
2777+
Model model = tEnv.fromModel("my_model");
2778+
2779+
// 4. 使用模型把文本翻译成中文
2780+
Table predictResult = model.predict(myTable, ColumnList.of("text"));
2781+
2782+
// 5. 异步预测示例
2783+
Table asyncPredictResult = model.predict(
2784+
myTable,
2785+
ColumnList.of("text"),
2786+
Map.of("async", "true")
2787+
);
2788+
```
2789+
2790+
{{< /tab >}}
2791+
{{< tab "Scala" >}}
2792+
2793+
```scala
2794+
// 1. 设置本地环境
2795+
val settings = EnvironmentSettings.inStreamingMode()
2796+
val tEnv = TableEnvironment.create(settings)
2797+
2798+
// 2. 从内存数据创建源表
2799+
val myTable: Table = tEnv.fromValues(
2800+
ROW(FIELD("text", STRING())),
2801+
row("Hello"),
2802+
row("Machine Learning"),
2803+
row("Good morning")
2804+
)
2805+
2806+
// 3. 创建模型
2807+
tEnv.createModel(
2808+
"my_model",
2809+
ModelDescriptor.forProvider("openai")
2810+
.inputSchema(Schema.newBuilder().column("input", STRING()).build())
2811+
.outputSchema(Schema.newBuilder().column("output", STRING()).build())
2812+
.option("endpoint", "https://api.openai.com/v1/chat/completions")
2813+
.option("model", "gpt-4.1")
2814+
.option("system-prompt", "translate to chinese")
2815+
.option("api-key", "<your-openai-api-key-here>")
2816+
.build()
2817+
)
2818+
2819+
val model = tEnv.fromModel("my_model")
2820+
2821+
// 4. 使用模型把文本翻译成中文
2822+
val predictResult = model.predict(myTable, ColumnList.of("text"))
2823+
2824+
// 5. 异步预测示例
2825+
val asyncPredictResult = model.predict(
2826+
myTable,
2827+
ColumnList.of("text"),
2828+
Map("async" -> "true").asJava
2829+
)
2830+
```
2831+
2832+
{{< /tab >}}
2833+
{{< tab "Python" >}}
2834+
2835+
```python
2836+
# 目前 Python Table API 尚不支持
2837+
```
2838+
2839+
{{< /tab >}}
2840+
{{< /tabs >}}
2841+
2842+
模型推理操作支持同步和异步预测模式 (需要底层接口 `ModelProvider`支持)。默认情况下, Planner使用异步预测。这可以通过允许并发请求来提高高延迟模型的吞吐量。
2843+
2844+
{{< top >}}
2845+
27382846
<a name="data-types"></a>
27392847
数据类型
27402848
----------

docs/content/docs/dev/table/tableApi.md

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2735,6 +2735,115 @@ result = t.select(col('a'), col('c')) \
27352735

27362736
{{< query_state_warning >}}
27372737

2738+
### Model Inference
2739+
2740+
{{< label Streaming >}}
2741+
2742+
The Table API supports model inference operations that allow you to integrate machine learning models directly into your data processing pipelines. You can create models with specific providers and use them to make inference on your data.
2743+
2744+
#### Creating and Using Models
2745+
2746+
Models are created using `ModelDescriptor` which specifies the provider, input/output schemas, and configuration options. Once created, you can use the model to make predictions on tables.
2747+
2748+
{{< tabs "model-inference" >}}
2749+
{{< tab "Java" >}}
2750+
2751+
```java
2752+
// 1. Set up the local environment
2753+
EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();
2754+
TableEnvironment tEnv = TableEnvironment.create(settings);
2755+
2756+
// 2. Create a source table from in-memory data
2757+
Table myTable = tEnv.fromValues(
2758+
ROW(FIELD("text", STRING())),
2759+
row("Hello"),
2760+
row("Machine Learning"),
2761+
row("Good morning")
2762+
);
2763+
2764+
// 3. Create model
2765+
tEnv.createModel(
2766+
"my_model",
2767+
ModelDescriptor.forProvider("openai")
2768+
.inputSchema(Schema.newBuilder().column("input", STRING()).build())
2769+
.outputSchema(Schema.newBuilder().column("output", STRING()).build())
2770+
.option("endpoint", "https://api.openai.com/v1/chat/completions")
2771+
.option("model", "gpt-4.1")
2772+
.option("system-prompt", "translate text to Chinese")
2773+
.option("api-key", "<your-openai-api-key-here>")
2774+
.build()
2775+
);
2776+
2777+
Model model = tEnv.fromModel("my_model");
2778+
2779+
// 4. Use the model to translate text to Chinese
2780+
Table predictResult = model.predict(myTable, ColumnList.of("text"));
2781+
2782+
// 5. Async prediction example
2783+
Table asyncPredictResult = model.predict(
2784+
myTable,
2785+
ColumnList.of("text"),
2786+
Map.of("async", "true")
2787+
);
2788+
```
2789+
2790+
{{< /tab >}}
2791+
{{< tab "Scala" >}}
2792+
2793+
```scala
2794+
// 1. Set up the local environment
2795+
val settings = EnvironmentSettings.inStreamingMode()
2796+
val tEnv = TableEnvironment.create(settings)
2797+
2798+
// 2. Create a source table from in-memory data
2799+
val myTable: Table = tEnv.fromValues(
2800+
ROW(FIELD("text", STRING())),
2801+
row("Hello"),
2802+
row("Machine Learning"),
2803+
row("Good morning")
2804+
)
2805+
2806+
// 3. Create model
2807+
tEnv.createModel(
2808+
"my_model",
2809+
ModelDescriptor.forProvider("openai")
2810+
.inputSchema(Schema.newBuilder().column("input", STRING()).build())
2811+
.outputSchema(Schema.newBuilder().column("output", STRING()).build())
2812+
.option("endpoint", "https://api.openai.com/v1/chat/completions")
2813+
.option("model", "gpt-4.1")
2814+
.option("system-prompt", "translate to chinese")
2815+
.option("api-key", "<your-openai-api-key-here>")
2816+
.build()
2817+
)
2818+
2819+
val model = tEnv.fromModel("my_model")
2820+
2821+
// 4. Use the model to translate text to Chinese
2822+
val predictResult = model.predict(myTable, ColumnList.of("text"))
2823+
2824+
// 5. Async prediction example
2825+
val asyncPredictResult = model.predict(
2826+
myTable,
2827+
ColumnList.of("text"),
2828+
Map("async" -> "true").asJava
2829+
)
2830+
```
2831+
2832+
{{< /tab >}}
2833+
{{< tab "Python" >}}
2834+
2835+
```python
2836+
# Not yet supported in Python Table API
2837+
```
2838+
2839+
{{< /tab >}}
2840+
{{< /tabs >}}
2841+
2842+
Model inference supports both synchronous and asynchronous prediction modes (when supported by the underlying `ModelProvider` interface).
2843+
By default, the planner uses asynchronous mode to maximize throughput for high-latency models by processing multiple requests concurrently.
2844+
2845+
{{< top >}}
2846+
27382847
Data Types
27392848
----------
27402849

0 commit comments

Comments
 (0)