Skip to content

Commit 8556217

Browse files
committed
update
1 parent 4fe95dd commit 8556217

19 files changed

Lines changed: 891 additions & 283 deletions

docs/Go-SDK/go-sdk-advanced-state-api.md

Lines changed: 314 additions & 0 deletions
Large diffs are not rendered by default.

docs/Go-SDK/go-sdk-guide.md

Lines changed: 6 additions & 215 deletions
Large diffs are not rendered by default.
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
<!--
2+
3+
Licensed to the Apache Software Foundation (ASF) under one
4+
or more contributor license agreements. See the NOTICE file
5+
distributed with this work for additional information
6+
regarding copyright ownership. The ASF licenses this file
7+
to you under the Apache License, Version 2.0 (the
8+
"License"); you may not use this file except in compliance
9+
with the License. You may obtain a copy of the License at
10+
11+
http://www.apache.org/licenses/LICENSE-2.0
12+
13+
Unless required by applicable law or agreed to in writing,
14+
software distributed under the License is distributed on an
15+
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16+
KIND, either express or implied. See the License for the
17+
specific language governing permissions and limitations
18+
under the License.
19+
20+
-->
21+
22+
# Python SDK — 高级状态 API
23+
24+
本文档介绍 Python SDK 的**高级状态 API**:基于底层 KvStore 的带类型状态抽象(ValueState、ListState、MapState 等),通过 **codec** 序列化,并支持按主键的 **keyed state**。设计与 [Go SDK 高级状态 API](../Go-SDK/go-sdk-guide.md#7-advanced-state-api) 对齐。
25+
26+
---
27+
28+
## 1. 概述
29+
30+
当需要结构化状态(单值、列表、Map、优先队列、聚合、归约)而不想手写字节编码或 key 布局时,可使用高级状态 API。创建方式有两种:通过 **Context**(如 `ctx.getOrCreateValueState(...)`)或通过状态类型上的**类型级构造方法**(推荐,便于复用,与 Go SDK 用法一致)。
31+
32+
---
33+
34+
## 2. 创建状态的两种方式
35+
36+
### 2.1 通过 Context(getOrCreate\*
37+
38+
`Context` 提供 `getOrCreateValueState(store_name, codec)``getOrCreateValueStateAutoCodec(store_name)` 等方法,以及 ListState、MapState、PriorityQueueState、AggregatingState、ReducingState 与所有 Keyed\* 工厂的对应方法。运行时实现会委托给下面所述的类型级 `from_context` / `from_context_auto_codec`
39+
40+
### 2.2 通过状态类型(推荐,与 Go SDK 一致)
41+
42+
每种状态类型和 keyed 工厂提供:
43+
44+
- **带 codec:** `XxxState.from_context(ctx, store_name, codec, ...)`
45+
- **AutoCodec:** `XxxState.from_context_auto_codec(ctx, store_name)` 或带可选类型参数,由 SDK 使用默认 codec(如 PickleCodec,或 Map key / PQ 元素所需的有序 codec)。
46+
47+
状态实例是轻量的;可在每次 `process` 中创建,或在 driver 中(如 `init`)缓存。同一 store 名称对应同一底层 store。
48+
49+
---
50+
51+
## 3. 非 Keyed 状态 — 构造方法一览
52+
53+
| 状态类型 | 带 codec | AutoCodec |
54+
|----------|----------|-----------|
55+
| ValueState | `ValueState.from_context(ctx, store_name, codec)` | `ValueState.from_context_auto_codec(ctx, store_name)` |
56+
| ListState | `ListState.from_context(ctx, store_name, codec)` | `ListState.from_context_auto_codec(ctx, store_name)` |
57+
| MapState | `MapState.from_context(ctx, store_name, key_codec, value_codec)``MapState.from_context_auto_key_codec(ctx, store_name, value_codec)` ||
58+
| PriorityQueueState | `PriorityQueueState.from_context(ctx, store_name, codec)` | `PriorityQueueState.from_context_auto_codec(ctx, store_name)` |
59+
| AggregatingState | `AggregatingState.from_context(ctx, store_name, acc_codec, agg_func)` | `AggregatingState.from_context_auto_codec(ctx, store_name, agg_func)` |
60+
| ReducingState | `ReducingState.from_context(ctx, store_name, value_codec, reduce_func)` | `ReducingState.from_context_auto_codec(ctx, store_name, reduce_func)` |
61+
62+
以上均可通过 Context 的 `ctx.getOrCreate*` 方法获得(如 `ctx.getOrCreateValueState(store_name, codec)`),其内部会委托给上述构造方法。
63+
64+
---
65+
66+
## 4. Keyed 状态 — 工厂与 key_group / key / namespace
67+
68+
**Keyed 状态面向 keyed 算子。** 流按 key 分区(如 keyBy)时,每个 key 拥有独立状态。可先获取一次**工厂**(通过 context、store 名称、**namespace****key_group**),再按**主键**(当前记录的流 key)创建状态。
69+
70+
### 4.1 key_group、key(主键)与 namespace
71+
72+
| 概念 | API 参数 | 含义 |
73+
|------|----------|------|
74+
| **key_group** | 创建工厂时的 `key_group` | **keyed 组**:标识该状态所属分区/组(如一组 “counters”,另一组 “sessions”)。 |
75+
| **key** | 工厂方法参数(如 `new_keyed_value(primary_key)`| 当前记录的**流 key 的值**(如用户 ID、分区 key)。不同 key 对应不同状态。 |
76+
| **namespace** | 创建工厂时的 `namespace`(bytes) | **有窗口时****窗口标识的 bytes****无窗口时****空 bytes**(如 `b""`)。 |
77+
78+
### 4.2 Keyed 工厂构造方法一览
79+
80+
| 工厂 | 带 codec | AutoCodec |
81+
|------|----------|-----------|
82+
| KeyedValueStateFactory | `KeyedValueStateFactory.from_context(ctx, store_name, namespace, key_group, value_codec)` | `KeyedValueStateFactory.from_context_auto_codec(ctx, store_name, namespace, key_group, value_type=None)` |
83+
| KeyedListStateFactory | `KeyedListStateFactory.from_context(ctx, store_name, namespace, key_group, value_codec)` | `KeyedListStateFactory.from_context_auto_codec(ctx, store_name, namespace, key_group, value_type=None)` |
84+
| KeyedMapStateFactory | `KeyedMapStateFactory.from_context(ctx, store_name, namespace, key_group, key_codec, value_codec)` | `KeyedMapStateFactory.from_context_auto_codec(ctx, store_name, namespace, key_group, value_codec)` |
85+
| KeyedPriorityQueueStateFactory | `KeyedPriorityQueueStateFactory.from_context(ctx, store_name, namespace, key_group, item_codec)` | `KeyedPriorityQueueStateFactory.from_context_auto_codec(ctx, store_name, namespace, key_group, item_type=None)` |
86+
| KeyedAggregatingStateFactory | `KeyedAggregatingStateFactory.from_context(ctx, store_name, namespace, key_group, acc_codec, agg_func)` | `KeyedAggregatingStateFactory.from_context_auto_codec(ctx, store_name, namespace, key_group, agg_func, acc_type=None)` |
87+
| KeyedReducingStateFactory | `KeyedReducingStateFactory.from_context(ctx, store_name, namespace, key_group, value_codec, reduce_func)` | `KeyedReducingStateFactory.from_context_auto_codec(ctx, store_name, namespace, key_group, reduce_func, value_type=None)` |
88+
89+
也可使用 Context 的 `ctx.getOrCreateKeyed*Factory(...)` 方法,其内部会委托给上述构造方法。
90+
91+
---
92+
93+
## 5. 示例:使用 from_context_auto_codec 的 ValueState
94+
95+
```python
96+
from fs_api import FSProcessorDriver, Context
97+
from fs_api.store import ValueState
98+
99+
class CounterProcessor(FSProcessorDriver):
100+
def process(self, ctx: Context, source_id: int, data: bytes):
101+
# 每条消息创建一次状态(或在 init 中缓存)
102+
state = ValueState.from_context_auto_codec(ctx, "my-store")
103+
cur, _ = state.value() or (0, False)
104+
state.update(cur + 1)
105+
ctx.emit(str(cur + 1).encode(), 0)
106+
```
107+
108+
其他状态类型用法相同:按上表使用 `XxxState.from_context(ctx, store_name, ...)``XxxState.from_context_auto_codec(ctx, store_name)`
109+
110+
---
111+
112+
## 6. 参见
113+
114+
- [Python SDK 指南](python-sdk-guide-zh.md) — fs_api、fs_client 及 Context/KvStore 基础用法。
115+
- [Go SDK 指南 — 高级状态 API](../Go-SDK/go-sdk-guide.md#7-advanced-state-api) — Go SDK 中的等价 API。
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
<!--
2+
3+
Licensed to the Apache Software Foundation (ASF) under one
4+
or more contributor license agreements. See the NOTICE file
5+
distributed with this work for additional information
6+
regarding copyright ownership. The ASF licenses this file
7+
to you under the Apache License, Version 2.0 (the
8+
"License"); you may not use this file except in compliance
9+
with the License. You may obtain a copy of the License at
10+
11+
http://www.apache.org/licenses/LICENSE-2.0
12+
13+
Unless required by applicable law or agreed to in writing,
14+
software distributed under the License is distributed on an
15+
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16+
KIND, either express or implied. See the License for the
17+
specific language governing permissions and limitations
18+
under the License.
19+
20+
-->
21+
22+
# Python SDK — Advanced State API
23+
24+
This document describes the **high-level state API** for the Python SDK: typed state abstractions (ValueState, ListState, MapState, etc.) built on top of the low-level KvStore, with serialization via **codecs** and optional **keyed state** per primary key. The design aligns with the [Go SDK Advanced State API](../Go-SDK/go-sdk-guide.md#7-advanced-state-api).
25+
26+
---
27+
28+
## 1. Overview
29+
30+
Use the advanced state API when you need structured state (single value, list, map, priority queue, aggregation, reduction) without manual byte encoding or key layout. You can create state either from **Context** (e.g. `ctx.getOrCreateValueState(...)`) or via **type-level constructors** on the state class (recommended for clarity and reuse, same pattern as the Go SDK).
31+
32+
---
33+
34+
## 2. Creating State: Two Ways
35+
36+
### 2.1 From Context (getOrCreate\*)
37+
38+
`Context` defines methods such as `getOrCreateValueState(store_name, codec)`, `getOrCreateValueStateAutoCodec(store_name)`, and the same pattern for ListState, MapState, PriorityQueueState, AggregatingState, ReducingState, and all Keyed\* factories. The runtime implementation delegates to the type-level `from_context` / `from_context_auto_codec` methods below.
39+
40+
### 2.2 From the state type (recommended, same as Go SDK)
41+
42+
Each state type and keyed factory provides:
43+
44+
- **With codec:** `XxxState.from_context(ctx, store_name, codec, ...)` — you pass the codec(s).
45+
- **AutoCodec:** `XxxState.from_context_auto_codec(ctx, store_name)` or with optional type hint — the SDK uses a default codec (e.g. `PickleCodec`, or ordered codecs for map key / PQ element where required).
46+
47+
State instances are lightweight; you may create them per message in `process` or cache in the driver (e.g. in `init`). Same store name yields the same underlying store.
48+
49+
---
50+
51+
## 3. Non-Keyed State — Constructor Summary
52+
53+
| State | With codec | AutoCodec |
54+
|-------|------------|-----------|
55+
| ValueState | `ValueState.from_context(ctx, store_name, codec)` | `ValueState.from_context_auto_codec(ctx, store_name)` |
56+
| ListState | `ListState.from_context(ctx, store_name, codec)` | `ListState.from_context_auto_codec(ctx, store_name)` |
57+
| MapState | `MapState.from_context(ctx, store_name, key_codec, value_codec)` or `MapState.from_context_auto_key_codec(ctx, store_name, value_codec)` ||
58+
| PriorityQueueState | `PriorityQueueState.from_context(ctx, store_name, codec)` | `PriorityQueueState.from_context_auto_codec(ctx, store_name)` |
59+
| AggregatingState | `AggregatingState.from_context(ctx, store_name, acc_codec, agg_func)` | `AggregatingState.from_context_auto_codec(ctx, store_name, agg_func)` |
60+
| ReducingState | `ReducingState.from_context(ctx, store_name, value_codec, reduce_func)` | `ReducingState.from_context_auto_codec(ctx, store_name, reduce_func)` |
61+
62+
All of the above can also be obtained via the corresponding `ctx.getOrCreate*` methods (e.g. `ctx.getOrCreateValueState(store_name, codec)`), which delegate to these constructors.
63+
64+
---
65+
66+
## 4. Keyed State — Factories and keyGroup / key / namespace
67+
68+
**Keyed state is for keyed operators.** When the stream is partitioned by a key (e.g. after keyBy), each key gets isolated state. You obtain a **factory** once (from context, store name, **namespace**, and **key_group**), then create state **per primary key** (the stream key for the current record).
69+
70+
### 4.1 keyGroup, key (primaryKey), and namespace
71+
72+
| Term | API parameter | Meaning |
73+
|------|----------------|---------|
74+
| **key_group** | `key_group` when creating the factory | The **keyed group**: identifies which keyed partition/group this state belongs to (e.g. one group for "counters", another for "sessions"). |
75+
| **key** | The argument to factory methods (e.g. `new_keyed_value(primary_key)`) | The **value of the stream key** for the current record (e.g. user ID, partition key). Each distinct key value gets isolated state. |
76+
| **namespace** | `namespace` (bytes) when creating the factory | **If a window function is present**, use the **window identifier as bytes**. **Without windows**, pass **empty bytes** (e.g. `b""`). |
77+
78+
### 4.2 Factory constructor summary (keyed)
79+
80+
| Factory | With codec | AutoCodec |
81+
|---------|------------|-----------|
82+
| KeyedValueStateFactory | `KeyedValueStateFactory.from_context(ctx, store_name, namespace, key_group, value_codec)` | `KeyedValueStateFactory.from_context_auto_codec(ctx, store_name, namespace, key_group, value_type=None)` |
83+
| KeyedListStateFactory | `KeyedListStateFactory.from_context(ctx, store_name, namespace, key_group, value_codec)` | `KeyedListStateFactory.from_context_auto_codec(ctx, store_name, namespace, key_group, value_type=None)` |
84+
| KeyedMapStateFactory | `KeyedMapStateFactory.from_context(ctx, store_name, namespace, key_group, key_codec, value_codec)` | `KeyedMapStateFactory.from_context_auto_codec(ctx, store_name, namespace, key_group, value_codec)` |
85+
| KeyedPriorityQueueStateFactory | `KeyedPriorityQueueStateFactory.from_context(ctx, store_name, namespace, key_group, item_codec)` | `KeyedPriorityQueueStateFactory.from_context_auto_codec(ctx, store_name, namespace, key_group, item_type=None)` |
86+
| KeyedAggregatingStateFactory | `KeyedAggregatingStateFactory.from_context(ctx, store_name, namespace, key_group, acc_codec, agg_func)` | `KeyedAggregatingStateFactory.from_context_auto_codec(ctx, store_name, namespace, key_group, agg_func, acc_type=None)` |
87+
| KeyedReducingStateFactory | `KeyedReducingStateFactory.from_context(ctx, store_name, namespace, key_group, value_codec, reduce_func)` | `KeyedReducingStateFactory.from_context_auto_codec(ctx, store_name, namespace, key_group, reduce_func, value_type=None)` |
88+
89+
You can also use the corresponding `ctx.getOrCreateKeyed*Factory(...)` methods, which delegate to these constructors.
90+
91+
---
92+
93+
## 5. Example: ValueState with from_context_auto_codec
94+
95+
```python
96+
from fs_api import FSProcessorDriver, Context
97+
from fs_api.store import ValueState
98+
99+
class CounterProcessor(FSProcessorDriver):
100+
def process(self, ctx: Context, source_id: int, data: bytes):
101+
# Create state per message (or cache in init)
102+
state = ValueState.from_context_auto_codec(ctx, "my-store")
103+
cur, _ = state.value() or (0, False)
104+
state.update(cur + 1)
105+
ctx.emit(str(cur + 1).encode(), 0)
106+
```
107+
108+
Same pattern for other state types: use `XxxState.from_context(ctx, store_name, ...)` or `XxxState.from_context_auto_codec(ctx, store_name)` as in the tables above.
109+
110+
---
111+
112+
## 6. See also
113+
114+
- [Python SDK Guide](python-sdk-guide.md) — main guide for fs_api, fs_client, and basic Context/KvStore usage.
115+
- [Go SDK Guide — Advanced State API](../Go-SDK/go-sdk-guide.md#7-advanced-state-api) — equivalent API in the Go SDK.

docs/Python-SDK/python-sdk-guide-zh.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,3 +148,11 @@ with FsClient(host="10.0.0.1", port=8080) as client:
148148
| BadRequestError (400) | YAML 配置不满足规范或 Kafka 参数错误 | 检查 WasmTaskBuilder 中的配置项。 |
149149
| ServerError (500) | Server 侧运行时环境(如 RocksDB)异常 | 检查服务端 conf/config.yaml 存储路径权限。 |
150150
| NotFoundError (404) | 操作了不存在的函数或无效的 Checkpoint | 确认函数名是否输入正确。 |
151+
152+
---
153+
154+
## 五、Advanced State API(高级状态 API)
155+
156+
带类型状态(ValueState、ListState、MapState、Keyed\* 工厂等)及 `from_context` / `from_context_auto_codec` 用法请参见独立文档:
157+
158+
- **[Python SDK — 高级状态 API](python-sdk-advanced-state-api-zh.md)**

docs/Python-SDK/python-sdk-guide.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,3 +148,11 @@ with FsClient(host="10.0.0.1", port=8080) as client:
148148
| BadRequestError (400) | YAML configuration does not meet specifications or Kafka parameters are incorrect | Check configuration items in WasmTaskBuilder. |
149149
| ServerError (500) | Server-side runtime environment (e.g., RocksDB) exception | Check permissions of storage path in server conf/config.yaml. |
150150
| NotFoundError (404) | Operating on a non-existent function or invalid Checkpoint | Confirm if the function name is correct. |
151+
152+
---
153+
154+
## 5. Advanced State API
155+
156+
For typed state (ValueState, ListState, MapState, Keyed\* factories, etc.) and `from_context` / `from_context_auto_codec` usage, see the dedicated document:
157+
158+
- **[Python SDK — Advanced State API](python-sdk-advanced-state-api.md)**

python/functionstream-api/src/fs_api/store/keyed/keyed_aggregating_state.py

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
# See the License for the specific language governing permissions and
1111
# limitations under the License.
1212

13-
from typing import Generic, Optional, Protocol, Tuple, TypeVar
13+
from typing import Any, Generic, Optional, Protocol, Tuple, TypeVar
1414

1515
from ..codec import Codec
1616
from ..complexkey import ComplexKey
@@ -55,6 +55,36 @@ def __init__(
5555
self._acc_codec = acc_codec
5656
self._agg_func = agg_func
5757

58+
@classmethod
59+
def from_context(
60+
cls,
61+
ctx: Any,
62+
store_name: str,
63+
namespace: bytes,
64+
key_group: bytes,
65+
acc_codec: Codec[ACC],
66+
agg_func: "AggregateFunc[T_agg, ACC, R]",
67+
) -> "KeyedAggregatingStateFactory[T_agg, ACC, R]":
68+
"""Create a KeyedAggregatingStateFactory from a context and store name (for keyed operators)."""
69+
store = ctx.getOrCreateKVStore(store_name)
70+
return cls(store, namespace, key_group, acc_codec, agg_func)
71+
72+
@classmethod
73+
def from_context_auto_codec(
74+
cls,
75+
ctx: Any,
76+
store_name: str,
77+
namespace: bytes,
78+
key_group: bytes,
79+
agg_func: "AggregateFunc[T_agg, ACC, R]",
80+
acc_type: Optional[type] = None,
81+
) -> "KeyedAggregatingStateFactory[T_agg, ACC, R]":
82+
"""Create a KeyedAggregatingStateFactory with default accumulator codec from context and store name."""
83+
from ..codec import PickleCodec, default_codec_for
84+
store = ctx.getOrCreateKVStore(store_name)
85+
codec = default_codec_for(acc_type) if acc_type is not None else PickleCodec()
86+
return cls(store, namespace, key_group, codec, agg_func)
87+
5888
def new_aggregating(self, key_codec: Codec[K]) -> "KeyedAggregatingState[K, T_agg, ACC, R]":
5989
ensure_ordered_key_codec(key_codec, "keyed aggregating")
6090
return KeyedAggregatingState(

python/functionstream-api/src/fs_api/store/keyed/keyed_list_state.py

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
# limitations under the License.
1212

1313
import struct
14-
from typing import Generic, List, Optional, TypeVar
14+
from typing import Any, Generic, List, Optional, TypeVar
1515

1616
from ..codec import Codec
1717
from ..complexkey import ComplexKey
@@ -45,6 +45,34 @@ def __init__(
4545
self._key_group = key_group
4646
self._value_codec = value_codec
4747

48+
@classmethod
49+
def from_context(
50+
cls,
51+
ctx: Any,
52+
store_name: str,
53+
namespace: bytes,
54+
key_group: bytes,
55+
value_codec: Codec[V],
56+
) -> "KeyedListStateFactory[V]":
57+
"""Create a KeyedListStateFactory from a context and store name (for keyed operators)."""
58+
store = ctx.getOrCreateKVStore(store_name)
59+
return cls(store, namespace, key_group, value_codec)
60+
61+
@classmethod
62+
def from_context_auto_codec(
63+
cls,
64+
ctx: Any,
65+
store_name: str,
66+
namespace: bytes,
67+
key_group: bytes,
68+
value_type: Optional[type] = None,
69+
) -> "KeyedListStateFactory[V]":
70+
"""Create a KeyedListStateFactory with default codec from context and store name."""
71+
from ..codec import PickleCodec, default_codec_for
72+
store = ctx.getOrCreateKVStore(store_name)
73+
codec = default_codec_for(value_type) if value_type is not None else PickleCodec()
74+
return cls(store, namespace, key_group, codec)
75+
4876
def new_list(self, key_codec: Codec[K]) -> "KeyedListState[K, V]":
4977
ensure_ordered_key_codec(key_codec, "keyed list")
5078
return KeyedListState(

0 commit comments

Comments
 (0)