You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
| KeyedValueStateFactory[V]|`NewKeyedValue(primaryKey []byte, namespace []byte) (*KeyedValueState[V], error)`| One value state per (primaryKey, namespace). |
195
196
| KeyedListStateFactory[V]|`NewKeyedList(primaryKey []byte, namespace []byte) (*KeyedListState[V], error)`| List state per (primaryKey, namespace). |
196
197
| KeyedMapStateFactory[MK,MV]|`NewKeyedMap(primaryKey []byte, mapName string) (*KeyedMapState[MK,MV], error)`| Map state per (primaryKey, mapName). |
197
198
| KeyedPriorityQueueStateFactory[V]|`NewKeyedPriorityQueue(primaryKey []byte, namespace []byte) (*KeyedPriorityQueueState[V], error)`| PQ state per (primaryKey, namespace). |
|**key_group**|`key_group` when creating the factory | The **keyed group**: identifies which keyed partition/group this state belongs to (e.g. one group for "counters", another for "sessions"). |
82
-
|**key**|`primary_key` when constructing state (e.g. `KeyedValueState(factory, primary_key, namespace)`) | The **value of the stream key** for the current record (e.g. user ID, partition key). Each distinct key value gets isolated state. |
82
+
|**key**|Argument to factory methods (e.g. `new_keyed_value(primary_key, namespace)`) | The **value of the stream key** for the current record (e.g. user ID, partition key). Each distinct key value gets isolated state. |
83
83
|**namespace**|`namespace` (bytes) when creating the factory |**If a window function is present**, use the **window identifier as bytes**. **Without windows**, pass **empty bytes** (e.g. `b""`). |
84
84
85
85
### 4.2 Factory constructor summary (keyed)
@@ -97,7 +97,7 @@ You can also use the corresponding `ctx.getOrCreateKeyed*Factory(...)` methods,
97
97
98
98
### 4.3 KeyedValueState
99
99
100
-
KeyedValueState aligns with the Go SDK: the factory takes only `key_group` (no namespace). Factory: `KeyedValueStateFactory.from_context(ctx, store_name, key_group, value_codec)` or `from_context_auto_codec(ctx, store_name, key_group, value_type=None)`. Construct state: `KeyedValueState(factory, primary_key, namespace)`with e.g. `namespace = state_name.encode("utf-8")`. State methods: `update(value)`, `value()` (returns `(value, found)`), `clear()`.
100
+
KeyedValueState aligns with the Go SDK: the factory takes only `key_group` (no namespace). Factory: `KeyedValueStateFactory.from_context(ctx, store_name, key_group, value_codec)` or `from_context_auto_codec(ctx, store_name, key_group, value_type=None)`. Create state: `factory.new_keyed_value(primary_key, namespace)`(namespace is bytes, required). State methods: `update(value)`, `value()` (returns `(value, found)`), `clear()`.
101
101
102
102
### 4.4 KeyedListState
103
103
@@ -147,7 +147,7 @@ When the stream is partitioned by key, create the factory in `init` and obtain s
147
147
148
148
```python
149
149
from fs_api import FSProcessorDriver, Context
150
-
from fs_api_advanced importKeyedValueState, KeyedValueStateFactory
150
+
from fs_api_advanced import KeyedValueStateFactory
151
151
152
152
classKeyedCounterProcessor(FSProcessorDriver):
153
153
definit(self, ctx: Context, config: dict):
@@ -157,7 +157,7 @@ class KeyedCounterProcessor(FSProcessorDriver):
Copy file name to clipboardExpand all lines: python/functionstream-api-advanced/src/fs_api_advanced/keyed/keyed_value_state.py
+11-1Lines changed: 11 additions & 1 deletion
Original file line number
Diff line number
Diff line change
@@ -19,7 +19,7 @@
19
19
20
20
21
21
classKeyedValueStateFactory(Generic[V]):
22
-
"""Factory for keyed value state. Create from context with key_group; construct KeyedValueState(factory, primary_key, namespace) per (primary_key, namespace)."""
22
+
"""Factory for keyed value state. Create from context with key_group; obtain state via new_keyed_value(primary_key, namespace)."""
23
23
24
24
def__init__(
25
25
self,
@@ -64,6 +64,16 @@ def from_context_auto_codec(
64
64
codec=default_codec_for(value_type)
65
65
returncls(store, key_group, codec)
66
66
67
+
defnew_keyed_value(
68
+
self, primary_key: bytes, namespace: bytes
69
+
) ->"KeyedValueState[V]":
70
+
"""Create a KeyedValueState for the given primary key and namespace."""
71
+
ifprimary_keyisNone:
72
+
raiseKvError("keyed value state primary_key must not be None")
73
+
ifnamespaceisNone:
74
+
raiseKvError("keyed value state namespace is required")
0 commit comments