|
79 | 79 | | 概念 | API 参数 | 含义 | |
80 | 80 | |---------------|------------------------------------------|---------------------------------------------------------| |
81 | 81 | | **key_group** | 创建工厂时的 `key_group` | **keyed 组**:标识该状态所属分区/组(如一组 “counters”,另一组 “sessions”)。 | |
82 | | -| **key** | 工厂方法参数(如 `new_keyed_value(primary_key)`) | 当前记录的**流 key 的值**(如用户 ID、分区 key)。不同 key 对应不同状态。 | |
| 82 | +| **key** | 构造状态时的 `primary_key`(如 `KeyedValueState(factory, primary_key, namespace)`) | 当前记录的**流 key 的值**(如用户 ID、分区 key)。不同 key 对应不同状态。 | |
83 | 83 | | **namespace** | 创建工厂时的 `namespace`(bytes) | **有窗口时**为**窗口标识的 bytes**;**无窗口时**传**空 bytes**(如 `b""`)。 | |
84 | 84 |
|
85 | 85 | ### 4.2 Keyed 工厂构造方法一览 |
86 | 86 |
|
87 | 87 | | 工厂 | 带 codec | AutoCodec | |
88 | 88 | |--------------------------------|-----------------------------------------------------------------------------------------------------------|--------------------------------------------------------------------------------------------------------------------------| |
89 | | -| KeyedValueStateFactory | `KeyedValueStateFactory.from_context(ctx, store_name, namespace, key_group, value_codec)` | `KeyedValueStateFactory.from_context_auto_codec(ctx, store_name, namespace, key_group, value_type=None)` | |
90 | | -| KeyedListStateFactory | `KeyedListStateFactory.from_context(ctx, store_name, namespace, key_group, value_codec)` | `KeyedListStateFactory.from_context_auto_codec(ctx, store_name, namespace, key_group, value_type=None)` | |
91 | | -| KeyedMapStateFactory | `KeyedMapStateFactory.from_context(ctx, store_name, namespace, key_group, key_codec, value_codec)` | `KeyedMapStateFactory.from_context_auto_codec(ctx, store_name, namespace, key_group, value_codec)` | |
92 | | -| KeyedPriorityQueueStateFactory | `KeyedPriorityQueueStateFactory.from_context(ctx, store_name, namespace, key_group, item_codec)` | `KeyedPriorityQueueStateFactory.from_context_auto_codec(ctx, store_name, namespace, key_group, item_type=None)` | |
93 | | -| KeyedAggregatingStateFactory | `KeyedAggregatingStateFactory.from_context(ctx, store_name, namespace, key_group, acc_codec, agg_func)` | `KeyedAggregatingStateFactory.from_context_auto_codec(ctx, store_name, namespace, key_group, agg_func, acc_type=None)` | |
94 | | -| KeyedReducingStateFactory | `KeyedReducingStateFactory.from_context(ctx, store_name, namespace, key_group, value_codec, reduce_func)` | `KeyedReducingStateFactory.from_context_auto_codec(ctx, store_name, namespace, key_group, reduce_func, value_type=None)` | |
| 89 | +| KeyedValueStateFactory | `KeyedValueStateFactory.from_context(ctx, store_name, key_group, value_codec)` | `KeyedValueStateFactory.from_context_auto_codec(ctx, store_name, key_group, value_type=None)` | |
| 90 | +| KeyedListStateFactory | `KeyedListStateFactory.from_context(ctx, store_name, key_group, value_codec)` | `KeyedListStateFactory.from_context_auto_codec(ctx, store_name, key_group, value_type=None)` | |
| 91 | +| KeyedMapStateFactory | `KeyedMapStateFactory.from_context(ctx, store_name, key_group, map_key_codec, map_value_codec)` | `KeyedMapStateFactory.from_context_auto_codec(ctx, store_name, key_group, map_key_type=None, map_value_type=None)` | |
| 92 | +| KeyedPriorityQueueStateFactory | `KeyedPriorityQueueStateFactory.from_context(ctx, store_name, key_group, item_codec)` | `KeyedPriorityQueueStateFactory.from_context_auto_codec(ctx, store_name, key_group, item_type=None)` | |
| 93 | +| KeyedAggregatingStateFactory | `KeyedAggregatingStateFactory.from_context(ctx, store_name, key_group, acc_codec, agg_func)` | `KeyedAggregatingStateFactory.from_context_auto_codec(ctx, store_name, key_group, agg_func, acc_type=None)` | |
| 94 | +| KeyedReducingStateFactory | `KeyedReducingStateFactory.from_context(ctx, store_name, key_group, value_codec, reduce_func)` | `KeyedReducingStateFactory.from_context_auto_codec(ctx, store_name, key_group, reduce_func, value_type=None)` | |
95 | 95 |
|
96 | 96 | 也可使用 Context 的 `ctx.getOrCreateKeyed*Factory(...)` 方法,其内部会委托给上述构造方法。 |
97 | 97 |
|
98 | 98 | ### 4.3 KeyedValueState |
99 | 99 |
|
100 | | -KeyedValueState 只需 **value codec**,不要求有序。工厂创建状态:`factory.new_keyed_value(primary_key, state_name="")`,得到 `KeyedValueState[V]`。状态方法:`update(value)`、`value()`(返回 `Optional[V]`)、`clear()`。主键由创建时传入的 `primary_key`(bytes)固定。 |
| 100 | +KeyedValueState 与 Go SDK 一致:工厂仅需 `key_group`(无 namespace)。工厂:`KeyedValueStateFactory.from_context(ctx, store_name, key_group, value_codec)` 或 `from_context_auto_codec(ctx, store_name, key_group, value_type=None)`。构造状态:`KeyedValueState(factory, primary_key, namespace)`,其中 namespace 可为 `state_name.encode("utf-8")`。状态方法:`update(value)`、`value()`(返回 `(value, found)`)、`clear()`。 |
| 101 | + |
| 102 | +### 4.4 KeyedListState |
| 103 | + |
| 104 | +KeyedListState 与 Go SDK 一致:工厂仅需 `key_group`(无 namespace),创建列表时再传入 **key** 与 **namespace**。工厂:`KeyedListStateFactory.from_context(ctx, store_name, key_group, value_codec)` 或 `from_context_auto_codec(ctx, store_name, key_group, value_type=None)`。创建列表:`factory.new_keyed_list(key, namespace)`,得到 `KeyedListState[V]`。状态方法:`add(value)`、`add_all(values)`、`get()`(返回 `List[V]`)、`update(values)`(先清空再整体写入)、`clear()`。 |
| 105 | + |
| 106 | +### 4.5 KeyedAggregatingState |
| 107 | + |
| 108 | +KeyedAggregatingState 与 Go SDK 一致:工厂仅需 `key_group`(无 namespace)。工厂:`KeyedAggregatingStateFactory.from_context(ctx, store_name, key_group, acc_codec, agg_func)` 或 `from_context_auto_codec(ctx, store_name, key_group, agg_func, acc_type=None)`。创建状态:`factory.new_aggregating_state(primary_key, state_name="")`,得到绑定到该 (primary_key, namespace=state_name) 的 `KeyedAggregatingState[T, ACC, R]`。状态方法:`add(value)`(向当前状态的 accumulator 合并)、`get()`(返回 `(result, found)`)、`clear()`。 |
| 109 | + |
| 110 | +### 4.6 KeyedMapState |
| 111 | + |
| 112 | +KeyedMapState 与 Go SDK 一致:工厂仅需 `key_group`(无 namespace),且 map key 的 codec 必须有序。工厂:`KeyedMapStateFactory.from_context(ctx, store_name, key_group, map_key_codec, map_value_codec)` 或 `from_context_auto_codec(ctx, store_name, key_group, map_key_type=None, map_value_type=None)`。创建 map:`factory.new_keyed_map(primary_key, map_name)`(map_name 必填,转为 namespace),得到 `KeyedMapState[MK, MV]`。状态方法:`put(map_key, value)`、`get(map_key)`(返回 `(value, found)`)、`delete(map_key)`、`clear()`(按前缀删除本 map 全部条目)、`all()`(迭代 `(map_key, value)`)。 |
| 113 | + |
| 114 | +### 4.7 KeyedPriorityQueueState |
| 115 | + |
| 116 | +KeyedPriorityQueueState 与 Go SDK 一致:工厂仅需 `key_group`(无 namespace),元素 codec 必须有序。工厂:`KeyedPriorityQueueStateFactory.from_context(ctx, store_name, key_group, item_codec)` 或 `from_context_auto_codec(ctx, store_name, key_group, item_type=None)`。创建队列:`factory.new_keyed_priority_queue(primary_key, namespace)`(primary_key 与 namespace 均必填,bytes),得到 `KeyedPriorityQueueState[V]`。状态方法:`add(value)`、`peek()`(返回 `(min_element, found)`)、`poll()`(取出并返回最小元素)、`clear()`(按前缀删除全部)、`all()`(按序迭代所有元素)。 |
| 117 | + |
| 118 | +### 4.8 KeyedReducingState |
| 119 | + |
| 120 | +KeyedReducingState 与 Go SDK 一致:工厂仅需 `key_group`(无 namespace)。工厂:`KeyedReducingStateFactory.from_context(ctx, store_name, key_group, value_codec, reduce_func)` 或 `from_context_auto_codec(ctx, store_name, key_group, reduce_func, value_type=None)`。创建状态:`factory.new_reducing_state(primary_key, namespace)`(两者必填,bytes),得到 `KeyedReducingState[V]`。状态方法:`add(value)`(与当前值经 reduce_func 合并后写入)、`get()`(返回 `(value, found)`)、`clear()`。 |
101 | 121 |
|
102 | 122 | --- |
103 | 123 |
|
@@ -127,19 +147,19 @@ class CounterProcessor(FSProcessorDriver): |
127 | 147 |
|
128 | 148 | ```python |
129 | 149 | from fs_api import FSProcessorDriver, Context |
130 | | -from fs_api_advanced import KeyedValueStateFactory |
| 150 | +from fs_api_advanced import KeyedValueState, KeyedValueStateFactory |
131 | 151 |
|
132 | 152 | class KeyedCounterProcessor(FSProcessorDriver): |
133 | 153 | def init(self, ctx: Context, config: dict): |
134 | 154 | self._factory = KeyedValueStateFactory.from_context_auto_codec( |
135 | | - ctx, "counters", b"", b"by_key", value_type=int |
| 155 | + ctx, "counters", b"by_key", value_type=int |
136 | 156 | ) |
137 | 157 |
|
138 | 158 | def process(self, ctx: Context, source_id: int, data: bytes): |
139 | 159 | primary_key = data[:8] |
140 | | - state = self._factory.new_keyed_value(primary_key, "count") |
141 | | - cur = state.value() |
142 | | - if cur is None: |
| 160 | + state = KeyedValueState(self._factory, primary_key, "count".encode("utf-8")) |
| 161 | + cur, found = state.value() |
| 162 | + if not found: |
143 | 163 | cur = 0 |
144 | 164 | state.update(cur + 1) |
145 | 165 | ctx.emit(str(cur + 1).encode(), 0) |
|
0 commit comments