Skip to content

StreamingDataFrame: retain a custom stream_id across operations #925

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jun 10, 2025

Conversation

daniil-quix
Copy link
Collaborator

@daniil-quix daniil-quix commented Jun 9, 2025

Problem

In #836, a custom stream_id parameter was introduced to the StreamingDataFrame class and its __dataframe_clone__ method; however, calling __dataframe_clone__ again reset the stream_id back to the default value obtained from the underlying topics.

The stream_id is used as part of the State stores' names, and it wasn't propagated correctly, leading to incorrect store names in some rare cases.
This PR corrects that, but the state stores created after .filter() or .apply() operations on the grouped DataFrame won't be accessible anymore.

:

topic = app.topic('<some-topic-with-one-partition>')

sdf = StreamingDataFrame(topic)  # stream_id = "<some-topic-with-one-partition>"

sdf_grouped = sdf.group_by("column")  # stream_id = "column--groupby--<some-topic-with-one-partition>"

# The store gets registered for the stream_id "column--groupby--<some-topic-with-one-partition>"
sdf_windowed = sdf_grouped.tumbling_window(...).sum().final()

# But here the state was registered under the stream_id "<some-topic-with-one-partition>".
# The correct stream_id is  "column--groupby--<some-topic-with-one-partition>"
sdf_windowed.apply(..., stateful=True)

Solution

  • Pass stream_id to the cloned dataframes
  • Update StreamingDataFrame.concat() to generate a new stream_id when concatenating branches with different stream_ids (possible when concatenating the group_by-ed dataframe with a one-partition topic

@daniil-quix daniil-quix merged commit b50675e into main Jun 10, 2025
4 checks passed
@daniil-quix daniil-quix deleted the fix/groupby-stream_id branch June 10, 2025 11:19
jbrass pushed a commit to jbrass/quix-streams that referenced this pull request Jun 10, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants