Skip to content

Add support for session window #917

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from

Conversation

petrpan26
Copy link
Contributor

Session Windows Implementation for Quix Streams

This PR introduces Session Windows - dynamic windowing that groups events by activity sessions rather than fixed time intervals.

Key Features

  • Activity-driven sessions: Sessions extend automatically when events arrive within timeout period
  • Flexible configuration: Configurable timeout and grace periods for late events
  • Full aggregation support: Works with all existing aggregation functions (sum, count, mean, max, min, reduce, collect)
  • Both modes supported: .current() for real-time updates, .final() for complete sessions
  • Closing strategies: Both "key" and "partition" level session expiration

Usage Examples

# User session tracking (30min timeout, 5min grace)
sdf.session_window(timeout_ms=timedelta(minutes=30), grace_ms=timedelta(minutes=5)) \
   .agg(page_views=Count(), actions=Collect("action_type")) \
   .final()

# Real-time fraud detection
sdf.session_window(timeout_ms=timedelta(minutes=10)) \
   .agg(transaction_count=Count(), total_amount=Sum("amount")) \
   .current()

Use Cases Enabled

  • User activity tracking (web/app sessions)
  • Fraud detection (transaction patterns)
  • IoT device monitoring (activity sessions)
  • Gaming analytics (player sessions)

Testing & Documentation

  • 32 comprehensive tests covering all session window functionality
  • Complete documentation added to docs/windowing.md with examples
  • Session merging scenarios and edge cases tested
  • Multi-key session management validated

Technical Implementation

  • Smart session extension logic with grace period support
  • Efficient RocksDB-backed state management
  • Proper transaction handling for concurrent access
  • Memory-efficient session garbage collection

This enables sophisticated streaming analytics use cases that were previously difficult to implement with fixed-time windows.

@petrpan26 petrpan26 force-pushed the petrpan26/session-window branch from d2b4108 to a8b14a2 Compare June 3, 2025 15:38
@petrpan26 petrpan26 force-pushed the petrpan26/session-window branch from a8b14a2 to 07ac02a Compare June 3, 2025 15:41
Copy link
Contributor

@gwaramadze gwaramadze left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hello. Once again, thanks for your contributions. Here is the first batch of comments. Please take a look. We will continue to review and test it.

@@ -9,7 +9,9 @@ With windows, you can calculate such aggregations as:
- Total of website visitors for every hour
- The average speed of a vehicle over the last 10 minutes
- Maximum temperature of a sensor observed over 30 second ranges
- Give an user a reward after 10 succesful actions
- Give an user a reward after 10 succesful actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
- Give an user a reward after 10 succesful actions
- Give a user a reward after 10 successful actions

Grace: 2 seconds

Session 1: [0, 20] - Events A, B (B extends the session from A)
Session 2: [25, 35] - Events C, D (D extends the session from C)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't that be [25, 40]?

Suggested change
Session 2: [25, 35] - Events C, D (D extends the session from C)
Session 2: [25, 40] - Events C, D (D extends the session from C)

Comment on lines +567 to +570
@property
def timeout_ms(self) -> int:
return self._timeout_ms

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this property is used anywhere

Suggested change
@property
def timeout_ms(self) -> int:
return self._timeout_ms

partition = ctx.partition
offset = ctx.offset
except:
# In test environments, message context might not be available
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We must not mock objects for test purposes conditionally in the lib code. Sliding windows have mock_message_context, so it should be reused where necessary.

round(time.monotonic() - start, 2),
)

def _on_expired_session(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is almost identical to existing _on_expired_window, we should abstract it.

Comment on lines +479 to +484
# By this point, session_start and session_end are guaranteed to be set
assert session_start is not None # noqa: S101
assert session_end is not None # noqa: S101

# Output intermediate results for aggregations
if aggregate:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Asserts are unnecessary. If we let them go, the next if is also rendered redundant.

Suggested change
# By this point, session_start and session_end are guaranteed to be set
assert session_start is not None # noqa: S101
assert session_end is not None # noqa: S101
# Output intermediate results for aggregations
if aggregate:


# Clean up expired windows
for window_start, window_end in windows_to_delete:
state._transaction.delete_window( # type: ignore # noqa: SLF001
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Usage of the private attribute _transaction, this must be fixed.

@@ -1484,6 +1485,103 @@ def sliding_count_window(
name=name,
)

def session_window(
self,
timeout_ms: Union[int, timedelta],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A more widely adopted nomenclature for session windows is "gap". Let's rename it to gap_ms or a more explicit inactivity_gap_ms.

@@ -348,6 +348,7 @@ def __init__(
reducer: Callable[[R, Any], R],
initializer: Callable[[Any], R],
) -> None:
super().__init__()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wasn't this a part of a separate bugfix PR? If not, then it should be separated together with the corresponding test case.

state._transaction.delete_window( # type: ignore # noqa: SLF001
window_start,
window_end,
prefix=state._prefix, # type: ignore # noqa: SLF001
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also here, we can't use the private member like this

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants