diff --git a/rholang/src/rust/interpreter/spaces/channel_store/array_store.rs b/rholang/src/rust/interpreter/spaces/channel_store/array_store.rs new file mode 100644 index 000000000..4bf5d109c --- /dev/null +++ b/rholang/src/rust/interpreter/spaces/channel_store/array_store.rs @@ -0,0 +1,556 @@ +//! ArrayChannelStore: Fixed-size indexed channel storage. + +use std::collections::HashMap; +use std::hash::Hash; +use std::marker::PhantomData; + +use super::{ChannelStore, DataCollection, ContinuationCollection, SpaceId, SpaceError, AllocationMode}; + +/// Array-based channel store with fixed size. +/// +/// Channels are created from indices via a channel factory function. +/// Gensym returns the next available index wrapped as a channel, and +/// returns `OutOfNames` when the array is full (unless cyclic mode is enabled). +/// +/// # Type Parameters +/// +/// - `C`: Channel type (e.g., `Par`, created from index via channel_factory) +/// - `P`: Pattern type for continuation matching +/// - `A`: Data type stored in channels +/// - `K`: Continuation type +/// - `DC`: Data collection type +/// - `CC`: Continuation collection type +/// +/// # Design Document Alignment +/// +/// Per the Reifying RSpaces design document (lines 194-204): +/// - Array channels are allocated sequentially via gensym (indices 0, 1, 2, ...) +/// - Indices are wrapped in Unforgeable{} so clients can't forge them +/// - Non-cyclic: returns OutOfNames error when capacity exceeded +/// - Cyclic: wraps around (ring buffer semantics) +#[derive(Debug)] +pub struct ArrayChannelStore +where + C: Clone + Eq + Hash, + DC: Clone, + CC: Clone, +{ + /// Data collections indexed by position (internal usize index) + data: Vec>, + + /// Continuation collections keyed by channel pattern + continuations: HashMap, CC>, + + /// Join patterns: channel -> list of join patterns it participates in + joins: HashMap>>, + + /// Channel to internal index mapping (for data access) + channel_to_index: HashMap, + + /// Index to channel mapping (for export and iteration) + index_to_channel: Vec>, + + /// Maximum size of the array + max_size: usize, + + /// Whether to wrap around when full + cyclic: bool, + + /// Next index to allocate + next_index: usize, + + /// Space ID for channel creation + space_id: SpaceId, + + /// Factory function to create channels from indices + channel_factory: fn(&SpaceId, usize) -> C, + + /// Extractor function to get index from a channel (reverse of channel_factory) + /// Returns Some(index) if the channel matches the expected pattern, None otherwise + index_extractor: fn(&SpaceId, &C) -> Option, + + /// Factory function to create new data collections + data_factory: fn() -> DC, + + /// Factory function to create new continuation collections + cont_factory: fn() -> CC, + + /// PhantomData to track P, A, K + _phantom: PhantomData<(P, A, K)>, +} + +impl Clone for ArrayChannelStore +where + C: Clone + Eq + Hash, + DC: Clone, + CC: Clone, +{ + fn clone(&self) -> Self { + ArrayChannelStore { + data: self.data.clone(), + continuations: self.continuations.clone(), + joins: self.joins.clone(), + channel_to_index: self.channel_to_index.clone(), + index_to_channel: self.index_to_channel.clone(), + max_size: self.max_size, + cyclic: self.cyclic, + next_index: self.next_index, + space_id: self.space_id.clone(), + channel_factory: self.channel_factory, + index_extractor: self.index_extractor, + data_factory: self.data_factory, + cont_factory: self.cont_factory, + _phantom: PhantomData, + } + } +} + +impl ArrayChannelStore +where + C: Clone + Eq + Hash + Send + Sync, + P: Clone + Send + Sync, + A: Clone + Send + Sync + std::fmt::Debug + 'static, + K: Clone + Send + Sync, + DC: Clone + Send + Sync, + CC: Clone + Send + Sync, +{ + /// Create a new array channel store with the given channel factory. + /// + /// # Arguments + /// + /// * `max_size` - Maximum number of channels that can be allocated + /// * `cyclic` - If true, wrap around to index 0 when full (ring buffer) + /// * `space_id` - Space ID for channel creation (passed to channel_factory) + /// * `channel_factory` - Function to create a channel from (space_id, index) + /// * `index_extractor` - Function to extract index from a channel (reverse of channel_factory) + /// * `data_factory` - Function to create empty data collections + /// * `cont_factory` - Function to create empty continuation collections + pub fn new( + max_size: usize, + cyclic: bool, + space_id: SpaceId, + channel_factory: fn(&SpaceId, usize) -> C, + index_extractor: fn(&SpaceId, &C) -> Option, + data_factory: fn() -> DC, + cont_factory: fn() -> CC, + ) -> Self { + ArrayChannelStore { + data: vec![None; max_size], + continuations: HashMap::new(), + joins: HashMap::new(), + channel_to_index: HashMap::with_capacity(max_size), + index_to_channel: vec![None; max_size], + max_size, + cyclic, + next_index: 0, + space_id, + channel_factory, + index_extractor, + data_factory, + cont_factory, + _phantom: PhantomData, + } + } + + /// Get the internal index for a channel (immutable lookup only). + fn get_index(&self, channel: &C) -> Option { + self.channel_to_index.get(channel).copied() + } + + /// Try to extract index from a channel and register it if valid. + /// Returns the index if successful, None if the channel doesn't match this space. + fn try_register_channel(&mut self, channel: &C) -> Option { + // First check if already registered + if let Some(index) = self.channel_to_index.get(channel) { + return Some(*index); + } + + // Try to extract index from the channel using the extractor + let extracted = (self.index_extractor)(&self.space_id, channel); + + let index = match extracted { + Some(idx) => idx, + None => { + // Extractor returned None - channel doesn't match this space's pattern + return None; + } + }; + + // Validate index is within bounds + if index >= self.max_size { + // For cyclic arrays, indices should have been wrapped by the reducer. + // If we see an out-of-bounds index, it means the reducer didn't wrap. + // This could happen if the channel was created before cyclic wrapping was added, + // or if there's a bug in the allocation logic. + if self.cyclic { + // For cyclic arrays, wrap the index as a fallback + let wrapped_index = index % self.max_size; + // Register with wrapped index + self.channel_to_index.insert(channel.clone(), wrapped_index); + self.index_to_channel[wrapped_index] = Some(channel.clone()); + return Some(wrapped_index); + } else { + return None; + } + } + + // Register the channel + self.channel_to_index.insert(channel.clone(), index); + self.index_to_channel[index] = Some(channel.clone()); + + Some(index) + } + + /// Create a normalized key for continuation lookup. + /// Channels in a join pattern are sorted for consistent lookup. + fn normalize_channels(channels: &[C]) -> Vec + where + C: Ord, + { + let mut sorted = channels.to_vec(); + sorted.sort(); + sorted + } +} + +impl ChannelStore for ArrayChannelStore +where + C: Clone + Eq + Hash + Ord + Send + Sync, + P: Clone + Send + Sync, + A: Clone + Send + Sync + std::fmt::Debug + 'static, + K: Clone + Send + Sync, + DC: DataCollection + Default + Clone + Send + Sync + 'static, + CC: ContinuationCollection + Default + Clone + Send + Sync, +{ + type Channel = C; + type Pattern = P; + type Data = A; + type Continuation = K; + type DataColl = DC; + type ContColl = CC; + + fn get_or_create_data_collection(&mut self, channel: &C) -> &mut DC { + // Try to register the channel if not already known (auto-registration for reducer-created channels) + let index = self.try_register_channel(channel) + .expect("Channel does not match this Array space pattern"); + + if self.data[index].is_none() { + self.data[index] = Some((self.data_factory)()); + } + self.data[index].as_mut().expect("Data collection should exist after creation") + } + + fn get_data_collection(&self, channel: &C) -> Option<&DC> { + let index = self.get_index(channel)?; + self.data.get(index)?.as_ref() + } + + fn get_data_collection_mut(&mut self, channel: &C) -> Option<&mut DC> { + let index = self.get_index(channel)?; + self.data.get_mut(index)?.as_mut() + } + + fn get_or_create_continuation_collection(&mut self, channels: &[C]) -> &mut CC { + let key = Self::normalize_channels(channels); + self.continuations + .entry(key) + .or_insert_with(|| (self.cont_factory)()) + } + + fn get_continuation_collection(&self, channels: &[C]) -> Option<&CC> { + let key = Self::normalize_channels(channels); + self.continuations.get(&key) + } + + fn get_continuation_collection_mut(&mut self, channels: &[C]) -> Option<&mut CC> { + let key = Self::normalize_channels(channels); + self.continuations.get_mut(&key) + } + + fn all_channels(&self) -> Vec<&C> { + self.index_to_channel + .iter() + .filter_map(|opt| opt.as_ref()) + .collect() + } + + fn gensym(&mut self, _space_id: &SpaceId) -> Result { + if self.next_index >= self.max_size { + if self.cyclic { + // Cyclic wrap-around: clear old mapping before reusing index + self.next_index = 0; + } else { + return Err(SpaceError::OutOfNames { + space_id: self.space_id.clone(), + max_size: self.max_size, + }); + } + } + + let index = self.next_index; + + // If cyclic and reusing an index, remove old channel mapping + if let Some(old_channel) = self.index_to_channel[index].take() { + self.channel_to_index.remove(&old_channel); + } + + // Create new channel from index using the factory + let channel = (self.channel_factory)(&self.space_id, index); + + // Store bidirectional mapping + self.channel_to_index.insert(channel.clone(), index); + self.index_to_channel[index] = Some(channel.clone()); + + self.next_index += 1; + Ok(channel) + } + + fn get_joins(&self, channel: &C) -> Vec> { + self.joins.get(channel).cloned().unwrap_or_default() + } + + fn put_join(&mut self, channels: Vec) { + for channel in &channels { + self.joins + .entry(channel.clone()) + .or_insert_with(Vec::new) + .push(channels.clone()); + } + } + + fn remove_join(&mut self, channels: &[C]) { + for channel in channels { + if let Some(joins) = self.joins.get_mut(channel) { + joins.retain(|j| j != channels); + } + } + } + + fn snapshot(&self) -> Self { + self.clone() + } + + fn clear(&mut self) { + for slot in &mut self.data { + *slot = None; + } + self.continuations.clear(); + self.joins.clear(); + self.channel_to_index.clear(); + for slot in &mut self.index_to_channel { + *slot = None; + } + self.next_index = 0; + } + + fn is_empty(&self) -> bool { + self.data.iter().all(|d| d.is_none()) && self.continuations.is_empty() + } + + fn export_data(&self) -> Vec<(C, DC)> { + self.index_to_channel + .iter() + .enumerate() + .filter_map(|(i, opt_channel)| { + opt_channel.as_ref().and_then(|channel| { + self.data.get(i)?.as_ref().map(|dc| (channel.clone(), dc.clone())) + }) + }) + .collect() + } + + fn export_continuations(&self) -> Vec<(Vec, CC)> { + self.continuations + .iter() + .map(|(cs, cc)| (cs.clone(), cc.clone())) + .collect() + } + + fn export_joins(&self) -> Vec<(C, Vec>)> { + self.joins + .iter() + .map(|(c, js)| (c.clone(), js.clone())) + .collect() + } + + fn for_each_data(&self, mut f: F) + where + F: FnMut(&C, &DC), + { + for (i, opt_channel) in self.index_to_channel.iter().enumerate() { + if let Some(channel) = opt_channel { + if let Some(Some(dc)) = self.data.get(i) { + f(channel, dc); + } + } + } + } + + fn for_each_continuation(&self, mut f: F) + where + F: FnMut(&[C], &CC), + { + for (cs, cc) in &self.continuations { + f(cs, cc); + } + } + + fn for_each_join(&self, mut f: F) + where + F: FnMut(&C, &[Vec]), + { + for (c, js) in &self.joins { + f(c, js); + } + } + + fn gensym_counter(&self) -> usize { + self.next_index + } + + fn import_data(&mut self, data: Vec<(C, DC)>) { + // Clear existing data and mappings + for slot in &mut self.data { + *slot = None; + } + self.channel_to_index.clear(); + for slot in &mut self.index_to_channel { + *slot = None; + } + + // Import new data - we need to reconstruct the channel mappings + // Since we don't know the original indices, we allocate sequentially + let mut next_idx = 0; + for (channel, dc) in data { + if next_idx < self.max_size { + self.data[next_idx] = Some(dc); + self.channel_to_index.insert(channel.clone(), next_idx); + self.index_to_channel[next_idx] = Some(channel); + next_idx += 1; + } + } + self.next_index = next_idx; + } + + fn import_continuations(&mut self, continuations: Vec<(Vec, CC)>) { + self.continuations.clear(); + for (channels, cc) in continuations { + self.continuations.insert(channels, cc); + } + } + + fn import_joins(&mut self, joins: Vec<(C, Vec>)>) { + self.joins.clear(); + for (channel, join_patterns) in joins { + self.joins.insert(channel, join_patterns); + } + } + + fn set_gensym_counter(&mut self, counter: usize) { + self.next_index = counter; + } + + fn allocation_mode(&self) -> AllocationMode { + AllocationMode::ArrayIndex { + max_size: self.max_size, + cyclic: self.cyclic, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::rust::interpreter::spaces::collections::{BagDataCollection, BagContinuationCollection}; + + /// Helper function to create a String channel from an index (for tests) + fn string_channel_factory(_space_id: &SpaceId, index: usize) -> String { + format!("channel_{}", index) + } + + /// Helper function to extract index from a String channel (for tests) + fn string_index_extractor(_space_id: &SpaceId, channel: &String) -> Option { + channel.strip_prefix("channel_")?.parse().ok() + } + + #[test] + fn test_array_store_out_of_names() { + let mut store: ArrayChannelStore, BagContinuationCollection> = + ArrayChannelStore::new( + 3, // Only 3 slots + false, // Not cyclic + SpaceId::default_space(), + string_channel_factory, + string_index_extractor, + BagDataCollection::new, + BagContinuationCollection::new, + ); + + let space_id = SpaceId::default_space(); + store.gensym(&space_id).unwrap(); + store.gensym(&space_id).unwrap(); + store.gensym(&space_id).unwrap(); + + // Fourth should fail + let result = store.gensym(&space_id); + assert!(matches!(result, Err(SpaceError::OutOfNames { .. }))); + } + + #[test] + fn test_array_store_cyclic() { + let mut store: ArrayChannelStore, BagContinuationCollection> = + ArrayChannelStore::new( + 3, + true, // Cyclic + SpaceId::default_space(), + string_channel_factory, + string_index_extractor, + BagDataCollection::new, + BagContinuationCollection::new, + ); + + let space_id = SpaceId::default_space(); + assert_eq!(store.gensym(&space_id).unwrap(), "channel_0"); + assert_eq!(store.gensym(&space_id).unwrap(), "channel_1"); + assert_eq!(store.gensym(&space_id).unwrap(), "channel_2"); + assert_eq!(store.gensym(&space_id).unwrap(), "channel_0"); // Wraps around + } + + #[test] + fn test_array_allocation_mode_returns_array_index() { + let store: ArrayChannelStore, BagContinuationCollection> = + ArrayChannelStore::new( + 10, // max_size + false, // cyclic + SpaceId::default_space(), + string_channel_factory, + string_index_extractor, + BagDataCollection::new, + BagContinuationCollection::new, + ); + + assert_eq!( + store.allocation_mode(), + AllocationMode::ArrayIndex { max_size: 10, cyclic: false } + ); + } + + #[test] + fn test_array_allocation_mode_cyclic() { + let store: ArrayChannelStore, BagContinuationCollection> = + ArrayChannelStore::new( + 5, // max_size + true, // cyclic + SpaceId::default_space(), + string_channel_factory, + string_index_extractor, + BagDataCollection::new, + BagContinuationCollection::new, + ); + + assert_eq!( + store.allocation_mode(), + AllocationMode::ArrayIndex { max_size: 5, cyclic: true } + ); + } +} diff --git a/rholang/src/rust/interpreter/spaces/channel_store/hashmap_store.rs b/rholang/src/rust/interpreter/spaces/channel_store/hashmap_store.rs new file mode 100644 index 000000000..316fc9ec1 --- /dev/null +++ b/rholang/src/rust/interpreter/spaces/channel_store/hashmap_store.rs @@ -0,0 +1,357 @@ +//! HashMapChannelStore: HashMap-based channel storage with O(1) lookup. + +use std::collections::HashMap; +use std::hash::Hash; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::marker::PhantomData; + +use super::{ChannelStore, DataCollection, ContinuationCollection, SpaceId, SpaceError}; + +/// HashMap-based channel store - O(1) lookup by channel. +/// +/// This is the most common storage type, providing fast random access +/// to any channel by its key. +/// +/// # Type Parameters +/// +/// - `C`: Channel type +/// - `P`: Pattern type for continuation matching +/// - `A`: Data type stored in channels +/// - `K`: Continuation type +/// - `DC`: Data collection type +/// - `CC`: Continuation collection type +#[derive(Debug)] +pub struct HashMapChannelStore +where + C: Clone + Eq + Hash, +{ + /// Data collections indexed by channel + data: HashMap, + + /// Continuation collections indexed by channel pattern (sorted channel vec) + continuations: HashMap, CC>, + + /// Join patterns: channel -> list of join patterns it participates in + joins: HashMap>>, + + /// Counter for generating unique channel names + gensym_counter: AtomicUsize, + + /// Factory function to create new data collections + data_factory: fn() -> DC, + + /// Factory function to create new continuation collections + cont_factory: fn() -> CC, + + /// PhantomData to track P, A, K + _phantom: PhantomData<(P, A, K)>, +} + +impl Clone for HashMapChannelStore +where + C: Clone + Eq + Hash, + DC: Clone, + CC: Clone, +{ + fn clone(&self) -> Self { + HashMapChannelStore { + data: self.data.clone(), + continuations: self.continuations.clone(), + joins: self.joins.clone(), + gensym_counter: AtomicUsize::new(self.gensym_counter.load(Ordering::SeqCst)), + data_factory: self.data_factory, + cont_factory: self.cont_factory, + _phantom: PhantomData, + } + } +} + +impl HashMapChannelStore +where + C: Clone + Eq + Hash + Send + Sync, + P: Clone + Send + Sync, + A: Clone + Send + Sync + std::fmt::Debug + 'static, + K: Clone + Send + Sync, + DC: Clone + Send + Sync, + CC: Clone + Send + Sync, +{ + /// Create a new HashMap channel store with the given factory functions. + pub fn new(data_factory: fn() -> DC, cont_factory: fn() -> CC) -> Self { + HashMapChannelStore { + data: HashMap::new(), + continuations: HashMap::new(), + joins: HashMap::new(), + gensym_counter: AtomicUsize::new(0), + data_factory, + cont_factory, + _phantom: PhantomData, + } + } + + /// Create a normalized key for continuation lookup. + /// Channels in a join pattern are sorted for consistent lookup. + fn normalize_channels(channels: &[C]) -> Vec + where + C: Ord, + { + let mut sorted = channels.to_vec(); + sorted.sort(); + sorted + } +} + +impl ChannelStore for HashMapChannelStore +where + C: Clone + Eq + Hash + Ord + Send + Sync + From, + P: Clone + Send + Sync, + A: Clone + Send + Sync + std::fmt::Debug + 'static, + K: Clone + Send + Sync, + DC: DataCollection + Default + Clone + Send + Sync + 'static, + CC: ContinuationCollection + Default + Clone + Send + Sync, +{ + type Channel = C; + type Pattern = P; + type Data = A; + type Continuation = K; + type DataColl = DC; + type ContColl = CC; + + fn get_or_create_data_collection(&mut self, channel: &C) -> &mut DC { + self.data + .entry(channel.clone()) + .or_insert_with(|| (self.data_factory)()) + } + + fn get_data_collection(&self, channel: &C) -> Option<&DC> { + self.data.get(channel) + } + + fn get_data_collection_mut(&mut self, channel: &C) -> Option<&mut DC> { + self.data.get_mut(channel) + } + + fn get_or_create_continuation_collection(&mut self, channels: &[C]) -> &mut CC { + let key = Self::normalize_channels(channels); + self.continuations + .entry(key) + .or_insert_with(|| (self.cont_factory)()) + } + + fn get_continuation_collection(&self, channels: &[C]) -> Option<&CC> { + let key = Self::normalize_channels(channels); + self.continuations.get(&key) + } + + fn get_continuation_collection_mut(&mut self, channels: &[C]) -> Option<&mut CC> { + let key = Self::normalize_channels(channels); + self.continuations.get_mut(&key) + } + + fn all_channels(&self) -> Vec<&C> { + self.data.keys().collect() + } + + fn gensym(&mut self, _space_id: &SpaceId) -> Result { + let id = self.gensym_counter.fetch_add(1, Ordering::SeqCst); + Ok(C::from(id)) + } + + fn get_joins(&self, channel: &C) -> Vec> { + self.joins.get(channel).cloned().unwrap_or_default() + } + + fn put_join(&mut self, channels: Vec) { + for channel in &channels { + self.joins + .entry(channel.clone()) + .or_insert_with(Vec::new) + .push(channels.clone()); + } + } + + fn remove_join(&mut self, channels: &[C]) { + for channel in channels { + if let Some(joins) = self.joins.get_mut(channel) { + joins.retain(|j| j != channels); + } + } + } + + fn snapshot(&self) -> Self { + Self { + data: self.data.clone(), + continuations: self.continuations.clone(), + joins: self.joins.clone(), + gensym_counter: AtomicUsize::new(self.gensym_counter.load(Ordering::SeqCst)), + data_factory: self.data_factory, + cont_factory: self.cont_factory, + _phantom: PhantomData, + } + } + + fn clear(&mut self) { + self.data.clear(); + self.continuations.clear(); + self.joins.clear(); + } + + fn is_empty(&self) -> bool { + self.data.is_empty() && self.continuations.is_empty() + } + + fn export_data(&self) -> Vec<(C, DC)> { + self.data + .iter() + .map(|(c, dc)| (c.clone(), dc.clone())) + .collect() + } + + fn export_continuations(&self) -> Vec<(Vec, CC)> { + self.continuations + .iter() + .map(|(cs, cc)| (cs.clone(), cc.clone())) + .collect() + } + + fn export_joins(&self) -> Vec<(C, Vec>)> { + self.joins + .iter() + .map(|(c, js)| (c.clone(), js.clone())) + .collect() + } + + fn for_each_data(&self, mut f: F) + where + F: FnMut(&C, &DC), + { + for (c, dc) in &self.data { + f(c, dc); + } + } + + fn for_each_continuation(&self, mut f: F) + where + F: FnMut(&[C], &CC), + { + for (cs, cc) in &self.continuations { + f(cs, cc); + } + } + + fn for_each_join(&self, mut f: F) + where + F: FnMut(&C, &[Vec]), + { + for (c, js) in &self.joins { + f(c, js); + } + } + + fn gensym_counter(&self) -> usize { + self.gensym_counter.load(Ordering::SeqCst) + } + + fn import_data(&mut self, data: Vec<(C, DC)>) { + self.data.clear(); + for (channel, dc) in data { + self.data.insert(channel, dc); + } + } + + fn import_continuations(&mut self, continuations: Vec<(Vec, CC)>) { + self.continuations.clear(); + for (channels, cc) in continuations { + self.continuations.insert(channels, cc); + } + } + + fn import_joins(&mut self, joins: Vec<(C, Vec>)>) { + self.joins.clear(); + for (channel, join_patterns) in joins { + self.joins.insert(channel, join_patterns); + } + } + + fn set_gensym_counter(&mut self, counter: usize) { + self.gensym_counter.store(counter, Ordering::SeqCst); + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::rust::interpreter::spaces::collections::{BagDataCollection, BagContinuationCollection}; + + #[test] + fn test_hashmap_store_basic() { + let mut store: HashMapChannelStore, BagContinuationCollection> = + HashMapChannelStore::new( + BagDataCollection::new, + BagContinuationCollection::new, + ); + + let channel = 42usize; + let dc = store.get_or_create_data_collection(&channel); + dc.put(100).unwrap(); + + assert_eq!(store.get_data_collection(&channel).unwrap().len(), 1); + } + + #[test] + fn test_hashmap_gensym() { + let mut store: HashMapChannelStore, BagContinuationCollection> = + HashMapChannelStore::new( + BagDataCollection::new, + BagContinuationCollection::new, + ); + + let space_id = SpaceId::default_space(); + let c1 = store.gensym(&space_id).unwrap(); + let c2 = store.gensym(&space_id).unwrap(); + let c3 = store.gensym(&space_id).unwrap(); + + assert_eq!(c1, 0); + assert_eq!(c2, 1); + assert_eq!(c3, 2); + } + + #[test] + fn test_join_patterns() { + let mut store: HashMapChannelStore, BagContinuationCollection> = + HashMapChannelStore::new( + BagDataCollection::new, + BagContinuationCollection::new, + ); + + store.put_join(vec![1, 2, 3]); + store.put_join(vec![1, 4]); + + let joins = store.get_joins(&1); + assert_eq!(joins.len(), 2); + + store.remove_join(&[1, 2, 3]); + let joins = store.get_joins(&1); + assert_eq!(joins.len(), 1); + } + + #[test] + fn test_hashmap_does_not_support_prefix_semantics() { + let store: HashMapChannelStore, BagContinuationCollection> = + HashMapChannelStore::new(BagDataCollection::new, BagContinuationCollection::new); + + assert!(!store.supports_prefix_semantics()); + } + + #[test] + fn test_hashmap_allocation_mode_returns_random() { + use crate::rust::interpreter::spaces::types::AllocationMode; + + let store: HashMapChannelStore, BagContinuationCollection> = + HashMapChannelStore::new( + BagDataCollection::new, + BagContinuationCollection::new, + ); + + assert_eq!(store.allocation_mode(), AllocationMode::Random); + } +} diff --git a/rholang/src/rust/interpreter/spaces/channel_store/hashset_store.rs b/rholang/src/rust/interpreter/spaces/channel_store/hashset_store.rs new file mode 100644 index 000000000..ebb576a9c --- /dev/null +++ b/rholang/src/rust/interpreter/spaces/channel_store/hashset_store.rs @@ -0,0 +1,455 @@ +//! HashSetChannelStore: HashSet-based channel storage with O(1) presence checking. + +use std::collections::{HashMap, HashSet}; +use std::hash::Hash; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::marker::PhantomData; + +use super::{ChannelStore, DataCollection, ContinuationCollection, SpaceId, SpaceError}; + +/// HashSet-based channel store for presence-only semantics. +/// +/// This store tracks channel presence using a HashSet for efficient +/// existence checking. Used for Seq (sequential process) spaces where +/// only the presence of data matters, not the full content for matching. +/// +/// Data is still stored in the backing HashMap, but the HashSet provides +/// fast O(1) presence checks without iteration. +/// +/// # Type Parameters +/// +/// - `C`: Channel type +/// - `P`: Pattern type for continuation matching +/// - `A`: Data type stored in channels +/// - `K`: Continuation type +/// - `DC`: Data collection type +/// - `CC`: Continuation collection type +#[derive(Debug)] +pub struct HashSetChannelStore +where + C: Clone + Eq + Hash, +{ + /// Set of channels that have data present + data_present: HashSet, + + /// Data collections indexed by channel + data: HashMap, + + /// Continuation collections indexed by channel pattern + continuations: HashMap, CC>, + + /// Join patterns: channel -> list of join patterns it participates in + joins: HashMap>>, + + /// Counter for generating unique channel names + gensym_counter: AtomicUsize, + + /// Factory function to create new data collections + data_factory: fn() -> DC, + + /// Factory function to create new continuation collections + cont_factory: fn() -> CC, + + /// PhantomData for unused type parameters + _phantom: PhantomData<(P, A, K)>, +} + +impl Clone for HashSetChannelStore +where + C: Clone + Eq + Hash, + DC: Clone, + CC: Clone, +{ + fn clone(&self) -> Self { + HashSetChannelStore { + data_present: self.data_present.clone(), + data: self.data.clone(), + continuations: self.continuations.clone(), + joins: self.joins.clone(), + gensym_counter: AtomicUsize::new(self.gensym_counter.load(Ordering::SeqCst)), + data_factory: self.data_factory, + cont_factory: self.cont_factory, + _phantom: PhantomData, + } + } +} + +impl HashSetChannelStore +where + C: Clone + Eq + Hash + Send + Sync, + P: Clone + Send + Sync, + A: Clone + Send + Sync + std::fmt::Debug + 'static, + K: Clone + Send + Sync, + DC: Clone + Send + Sync, + CC: Clone + Send + Sync, +{ + /// Create a new HashSet channel store with the given factory functions. + pub fn new(data_factory: fn() -> DC, cont_factory: fn() -> CC) -> Self { + HashSetChannelStore { + data_present: HashSet::new(), + data: HashMap::new(), + continuations: HashMap::new(), + joins: HashMap::new(), + gensym_counter: AtomicUsize::new(0), + data_factory, + cont_factory, + _phantom: PhantomData, + } + } + + /// Check if a channel has any data present (O(1) lookup). + pub fn has_data(&self, channel: &C) -> bool { + self.data_present.contains(channel) + } + + /// Get all channels that have data present. + pub fn channels_with_data(&self) -> impl Iterator { + self.data_present.iter() + } + + /// Mark a channel as having data. + pub fn mark_data_present(&mut self, channel: &C) { + self.data_present.insert(channel.clone()); + } + + /// Mark a channel as not having data. + pub fn mark_data_absent(&mut self, channel: &C) { + self.data_present.remove(channel); + } + + /// Create a normalized key for continuation lookup. + fn normalize_channels(channels: &[C]) -> Vec + where + C: Ord, + { + let mut sorted = channels.to_vec(); + sorted.sort(); + sorted + } +} + +impl ChannelStore for HashSetChannelStore +where + C: Clone + Eq + Hash + Ord + Send + Sync + From, + P: Clone + Send + Sync, + A: Clone + Send + Sync + std::fmt::Debug + 'static, + K: Clone + Send + Sync, + DC: DataCollection + Default + Clone + Send + Sync + 'static, + CC: ContinuationCollection + Default + Clone + Send + Sync, +{ + type Channel = C; + type Pattern = P; + type Data = A; + type Continuation = K; + type DataColl = DC; + type ContColl = CC; + + fn get_or_create_data_collection(&mut self, channel: &C) -> &mut DC { + // Mark as present when creating data collection + self.data_present.insert(channel.clone()); + self.data + .entry(channel.clone()) + .or_insert_with(|| (self.data_factory)()) + } + + fn get_data_collection(&self, channel: &C) -> Option<&DC> { + // Fast path: check presence first + if !self.data_present.contains(channel) { + return None; + } + self.data.get(channel) + } + + fn get_data_collection_mut(&mut self, channel: &C) -> Option<&mut DC> { + if !self.data_present.contains(channel) { + return None; + } + self.data.get_mut(channel) + } + + fn get_or_create_continuation_collection(&mut self, channels: &[C]) -> &mut CC { + let key = Self::normalize_channels(channels); + self.continuations + .entry(key) + .or_insert_with(|| (self.cont_factory)()) + } + + fn get_continuation_collection(&self, channels: &[C]) -> Option<&CC> { + let key = Self::normalize_channels(channels); + self.continuations.get(&key) + } + + fn get_continuation_collection_mut(&mut self, channels: &[C]) -> Option<&mut CC> { + let key = Self::normalize_channels(channels); + self.continuations.get_mut(&key) + } + + fn all_channels(&self) -> Vec<&C> { + self.data_present.iter().collect() + } + + fn gensym(&mut self, _space_id: &SpaceId) -> Result { + let id = self.gensym_counter.fetch_add(1, Ordering::SeqCst); + Ok(C::from(id)) + } + + fn get_joins(&self, channel: &C) -> Vec> { + self.joins.get(channel).cloned().unwrap_or_default() + } + + fn put_join(&mut self, channels: Vec) { + for channel in &channels { + self.joins + .entry(channel.clone()) + .or_insert_with(Vec::new) + .push(channels.clone()); + } + } + + fn remove_join(&mut self, channels: &[C]) { + for channel in channels { + if let Some(joins) = self.joins.get_mut(channel) { + joins.retain(|j| j != channels); + } + } + } + + fn snapshot(&self) -> Self { + Self { + data_present: self.data_present.clone(), + data: self.data.clone(), + continuations: self.continuations.clone(), + joins: self.joins.clone(), + gensym_counter: AtomicUsize::new(self.gensym_counter.load(Ordering::SeqCst)), + data_factory: self.data_factory, + cont_factory: self.cont_factory, + _phantom: PhantomData, + } + } + + fn clear(&mut self) { + self.data_present.clear(); + self.data.clear(); + self.continuations.clear(); + self.joins.clear(); + } + + fn is_empty(&self) -> bool { + self.data_present.is_empty() && self.continuations.is_empty() + } + + fn export_data(&self) -> Vec<(C, DC)> { + self.data + .iter() + .map(|(c, dc)| (c.clone(), dc.clone())) + .collect() + } + + fn export_continuations(&self) -> Vec<(Vec, CC)> { + self.continuations + .iter() + .map(|(cs, cc)| (cs.clone(), cc.clone())) + .collect() + } + + fn export_joins(&self) -> Vec<(C, Vec>)> { + self.joins + .iter() + .map(|(c, js)| (c.clone(), js.clone())) + .collect() + } + + fn for_each_data(&self, mut f: F) + where + F: FnMut(&C, &DC), + { + for (c, dc) in &self.data { + f(c, dc); + } + } + + fn for_each_continuation(&self, mut f: F) + where + F: FnMut(&[C], &CC), + { + for (cs, cc) in &self.continuations { + f(cs, cc); + } + } + + fn for_each_join(&self, mut f: F) + where + F: FnMut(&C, &[Vec]), + { + for (c, js) in &self.joins { + f(c, js); + } + } + + fn gensym_counter(&self) -> usize { + self.gensym_counter.load(Ordering::SeqCst) + } + + fn import_data(&mut self, data: Vec<(C, DC)>) { + self.data.clear(); + self.data_present.clear(); + for (channel, dc) in data { + self.data_present.insert(channel.clone()); + self.data.insert(channel, dc); + } + } + + fn import_continuations(&mut self, continuations: Vec<(Vec, CC)>) { + self.continuations.clear(); + for (channels, cc) in continuations { + self.continuations.insert(channels, cc); + } + } + + fn import_joins(&mut self, joins: Vec<(C, Vec>)>) { + self.joins.clear(); + for (channel, join_patterns) in joins { + self.joins.insert(channel, join_patterns); + } + } + + fn set_gensym_counter(&mut self, counter: usize) { + self.gensym_counter.store(counter, Ordering::SeqCst); + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::rust::interpreter::spaces::collections::{BagDataCollection, BagContinuationCollection}; + + #[test] + fn test_hashset_store_basic() { + let mut store: HashSetChannelStore, BagContinuationCollection> = + HashSetChannelStore::new( + BagDataCollection::new, + BagContinuationCollection::new, + ); + + let channel = 42usize; + let dc = store.get_or_create_data_collection(&channel); + dc.put(100).expect("put should succeed"); + + assert!(store.has_data(&channel)); + assert_eq!(store.get_data_collection(&channel).expect("collection should exist").len(), 1); + } + + #[test] + fn test_hashset_gensym() { + let mut store: HashSetChannelStore, BagContinuationCollection> = + HashSetChannelStore::new( + BagDataCollection::new, + BagContinuationCollection::new, + ); + + let space_id = SpaceId::default_space(); + let c1 = store.gensym(&space_id).expect("gensym should succeed"); + let c2 = store.gensym(&space_id).expect("gensym should succeed"); + let c3 = store.gensym(&space_id).expect("gensym should succeed"); + + assert_eq!(c1, 0); + assert_eq!(c2, 1); + assert_eq!(c3, 2); + } + + #[test] + fn test_hashset_presence_tracking() { + let mut store: HashSetChannelStore, BagContinuationCollection> = + HashSetChannelStore::new( + BagDataCollection::new, + BagContinuationCollection::new, + ); + + let channel = 42usize; + + // Initially no data present + assert!(!store.has_data(&channel)); + assert!(store.get_data_collection(&channel).is_none()); + + // After creating data collection, it's marked present + store.get_or_create_data_collection(&channel); + assert!(store.has_data(&channel)); + + // Can manually mark absent + store.mark_data_absent(&channel); + assert!(!store.has_data(&channel)); + + // Can manually mark present + store.mark_data_present(&channel); + assert!(store.has_data(&channel)); + } + + #[test] + fn test_hashset_channels_with_data() { + let mut store: HashSetChannelStore, BagContinuationCollection> = + HashSetChannelStore::new( + BagDataCollection::new, + BagContinuationCollection::new, + ); + + store.get_or_create_data_collection(&1); + store.get_or_create_data_collection(&2); + store.get_or_create_data_collection(&3); + + let channels: Vec<_> = store.channels_with_data().collect(); + assert_eq!(channels.len(), 3); + assert!(channels.contains(&&1)); + assert!(channels.contains(&&2)); + assert!(channels.contains(&&3)); + } + + #[test] + fn test_hashset_clear() { + let mut store: HashSetChannelStore, BagContinuationCollection> = + HashSetChannelStore::new( + BagDataCollection::new, + BagContinuationCollection::new, + ); + + store.get_or_create_data_collection(&1); + store.get_or_create_data_collection(&2); + assert!(!store.is_empty()); + + store.clear(); + assert!(store.is_empty()); + assert!(!store.has_data(&1)); + assert!(!store.has_data(&2)); + } + + #[test] + fn test_hashset_join_patterns() { + let mut store: HashSetChannelStore, BagContinuationCollection> = + HashSetChannelStore::new( + BagDataCollection::new, + BagContinuationCollection::new, + ); + + store.put_join(vec![1, 2, 3]); + store.put_join(vec![1, 4]); + + let joins = store.get_joins(&1); + assert_eq!(joins.len(), 2); + + store.remove_join(&[1, 2, 3]); + let joins = store.get_joins(&1); + assert_eq!(joins.len(), 1); + } + + #[test] + fn test_hashset_allocation_mode_returns_random() { + use crate::rust::interpreter::spaces::types::AllocationMode; + + let store: HashSetChannelStore, BagContinuationCollection> = + HashSetChannelStore::new( + BagDataCollection::new, + BagContinuationCollection::new, + ); + + assert_eq!(store.allocation_mode(), AllocationMode::Random); + } +} diff --git a/rholang/src/rust/interpreter/spaces/channel_store/mod.rs b/rholang/src/rust/interpreter/spaces/channel_store/mod.rs new file mode 100644 index 000000000..941a42118 --- /dev/null +++ b/rholang/src/rust/interpreter/spaces/channel_store/mod.rs @@ -0,0 +1,307 @@ +//! Layer 2: Outer Storage Structures (Channel Store) +//! +//! This module defines traits and implementations for how channels are indexed +//! and organized within a space. Different storage types provide different +//! performance characteristics and capabilities. +//! +//! # Storage Types +//! +//! - **HashMap**: O(1) lookup by channel key +//! - **PathMap**: Hierarchical paths with prefix matching (for MeTTa) +//! - **Array**: Fixed size, gensym returns indices +//! - **Vector**: Unbounded, gensym grows the vector +//! - **HashSet**: Presence-only for sequential processes + +use std::collections::HashMap; +use std::hash::Hash; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::marker::PhantomData; + +// DataCollection and ContinuationCollection are used by consumers of this module +pub use super::errors::SpaceError; +pub use super::types::{AllocationMode, SpaceId}; +pub use super::collections::{DataCollection, ContinuationCollection}; + +// ========================================================================== +// Channel Store Trait +// ========================================================================== + +/// Trait for channel storage - how channels are indexed and accessed. +/// +/// This trait uses associated types to specify the complete type configuration +/// for a channel store. This allows `GenericRSpace` to infer all types +/// from the store type alone. +/// +/// # Associated Types +/// +/// - `Channel`: The channel/name type (e.g., `Par`, `usize`, `Vec`) +/// - `Pattern`: The pattern type for continuation matching +/// - `Data`: The data type stored in channels +/// - `Continuation`: The continuation type +/// - `DataColl`: The data collection type (e.g., `BagDataCollection`) +/// - `ContColl`: The continuation collection type +/// +/// # Prefix Semantics +/// +/// Some storage types (PathMap) support prefix semantics where data sent on a +/// child path is visible when receiving at a parent prefix. Stores can indicate +/// this capability via `supports_prefix_semantics()`. +/// +/// When prefix semantics are enabled: +/// - `produce(@[0,1,2], data)` stores data at exact path `@[0,1,2]` +/// - `consume(@[0,1], pattern)` can receive data from `@[0,1,2]` with suffix `[2]` +/// +/// # Formal Correspondence +/// - `PathMapStore.v`: Prefix aggregation theorems +/// - `PathMapQuantale.v`: Path concatenation properties +/// +/// # Design Document Alignment +/// +/// This trait aligns with the design document's `ChannelStore` specification +/// (lines 405-441) but uses fully associated types to enable `GenericRSpace` +/// (2 parameters) instead of `GenericRSpace` (8 parameters). +pub trait ChannelStore: Clone + Send + Sync { + /// The channel type (e.g., `Par`, `usize`, `Vec`). + type Channel: Clone + Eq + Hash + Send + Sync; + + /// The pattern type for continuation matching. + type Pattern: Clone + Send + Sync; + + /// The data type stored in channels. + type Data: Clone + Send + Sync + std::fmt::Debug + 'static; + + /// The continuation type. + type Continuation: Clone + Send + Sync; + + /// The data collection type for storing data at channels. + type DataColl: DataCollection + Default + Clone + Send + Sync + 'static; + + /// The continuation collection type for storing continuations. + type ContColl: ContinuationCollection + Default + Clone + Send + Sync; + + // ========================================================================= + // Core Channel Operations + // ========================================================================= + + /// Get or create a data collection for the given channel. + fn get_or_create_data_collection(&mut self, channel: &Self::Channel) -> &mut Self::DataColl; + + /// Get a data collection for the given channel if it exists. + fn get_data_collection(&self, channel: &Self::Channel) -> Option<&Self::DataColl>; + + /// Get a mutable data collection for the given channel if it exists. + fn get_data_collection_mut(&mut self, channel: &Self::Channel) -> Option<&mut Self::DataColl>; + + /// Get or create a continuation collection for the given channel pattern. + fn get_or_create_continuation_collection(&mut self, channels: &[Self::Channel]) -> &mut Self::ContColl; + + /// Get a continuation collection for the given channel pattern if it exists. + fn get_continuation_collection(&self, channels: &[Self::Channel]) -> Option<&Self::ContColl>; + + /// Get a mutable continuation collection for the given channel pattern. + fn get_continuation_collection_mut(&mut self, channels: &[Self::Channel]) -> Option<&mut Self::ContColl>; + + /// Get all channels in the store. + fn all_channels(&self) -> Vec<&Self::Channel>; + + /// Generate a new unique channel name. + fn gensym(&mut self, space_id: &SpaceId) -> Result; + + /// Get join patterns for a channel (channels that participate in joins with this channel). + fn get_joins(&self, channel: &Self::Channel) -> Vec>; + + /// Record a join pattern. + fn put_join(&mut self, channels: Vec); + + /// Remove a join pattern. + fn remove_join(&mut self, channels: &[Self::Channel]); + + /// Create a snapshot of the store for checkpointing. + fn snapshot(&self) -> Self; + + /// Clear all data and continuations from the store. + fn clear(&mut self); + + /// Check if the store is empty. + fn is_empty(&self) -> bool; + + /// Export all data collections for serialization. + /// + /// Returns a vector of (channel, data_collection) pairs. + /// Used for checkpointing state to persistent storage. + fn export_data(&self) -> Vec<(Self::Channel, Self::DataColl)>; + + /// Export all continuation collections for serialization. + /// + /// Returns a vector of (channel_pattern, continuation_collection) pairs. + /// Used for checkpointing state to persistent storage. + fn export_continuations(&self) -> Vec<(Vec, Self::ContColl)>; + + /// Export all join patterns for serialization. + /// + /// Returns a vector of (channel, join_patterns) pairs. + /// Used for checkpointing state to persistent storage. + fn export_joins(&self) -> Vec<(Self::Channel, Vec>)>; + + // ========================================================================= + // Zero-Copy Iteration (Callback-Based) + // ========================================================================= + + /// Iterate over all data collections without cloning. + /// + /// This is more efficient than `export_data()` when you only need to + /// read the data without taking ownership. Useful for: + /// - Streaming serialization + /// - Computing statistics + /// - Inspection/debugging + /// + /// # Arguments + /// * `f` - Callback function called for each (channel, data_collection) pair + fn for_each_data(&self, f: F) + where + F: FnMut(&Self::Channel, &Self::DataColl); + + /// Iterate over all continuation collections without cloning. + /// + /// This is more efficient than `export_continuations()` when you only need + /// to read the continuations without taking ownership. + /// + /// # Arguments + /// * `f` - Callback function called for each (channels, continuation_collection) pair + fn for_each_continuation(&self, f: F) + where + F: FnMut(&[Self::Channel], &Self::ContColl); + + /// Iterate over all join patterns without cloning. + /// + /// This is more efficient than `export_joins()` when you only need + /// to read the join patterns without taking ownership. + /// + /// # Arguments + /// * `f` - Callback function called for each (channel, join_patterns) pair + fn for_each_join(&self, f: F) + where + F: FnMut(&Self::Channel, &[Vec]); + + /// Get the current gensym counter value for serialization. + fn gensym_counter(&self) -> usize; + + /// Import data collections from deserialized state. + /// + /// Replaces existing data with the provided collections. + fn import_data(&mut self, data: Vec<(Self::Channel, Self::DataColl)>); + + /// Import continuation collections from deserialized state. + /// + /// Replaces existing continuations with the provided collections. + fn import_continuations(&mut self, continuations: Vec<(Vec, Self::ContColl)>); + + /// Import join patterns from deserialized state. + /// + /// Replaces existing joins with the provided patterns. + fn import_joins(&mut self, joins: Vec<(Self::Channel, Vec>)>); + + /// Set the gensym counter value from deserialized state. + fn set_gensym_counter(&mut self, counter: usize); + + // ========================================================================= + // Prefix Semantics (PathMap Support) + // ========================================================================= + + /// Check if this store supports prefix semantics. + /// + /// When true, data at path `@[0,1,2]` can be received by a consumer at `@[0,1]`. + /// The suffix `[2]` is attached to the data as a key for pattern matching. + /// + /// # Default + /// Returns `false` for most stores. Only PathMapChannelStore returns `true`. + fn supports_prefix_semantics(&self) -> bool { + false + } + + /// Get all channels that have the given channel as a prefix. + /// + /// For PathMap, this returns all paths that start with `prefix`. + /// For other stores, returns an empty vector (no prefix relationship). + /// + /// # Example (PathMap) + /// If store contains `@[0,1,2]` and `@[0,1,3]`, then: + /// `channels_with_prefix(@[0,1])` returns `[@[0,1,2], @[0,1,3]]` + fn channels_with_prefix(&self, _prefix: &Self::Channel) -> Vec { + Vec::new() + } + + /// Get all prefixes of the given channel. + /// + /// For PathMap, returns all prefix paths from shortest to longest. + /// For other stores, returns an empty vector (no prefix relationship). + /// + /// # Example (PathMap) + /// `channel_prefixes(@[0,1,2])` returns `[@[0], @[0,1], @[0,1,2]]` + fn channel_prefixes(&self, _channel: &Self::Channel) -> Vec { + Vec::new() + } + + /// Get all continuation patterns that involve the given channel as a prefix. + /// + /// This finds continuations waiting on prefix paths of the given channel. + /// Used during `produce()` to find matching continuations at prefix paths. + /// + /// # Returns + /// Vector of (pattern_channels, continuation_collection) pairs where + /// at least one channel in pattern_channels is a prefix of `channel`. + fn continuation_patterns_for_prefix(&self, _channel: &Self::Channel) -> Vec<(&Vec, &Self::ContColl)> { + Vec::new() + } + + /// Compute the suffix key for a descendant channel relative to a prefix channel. + /// + /// When a consumer at prefix path `@[0,1]` receives data from `@[0,1,2,3]`, + /// the suffix key is `[2,3]` - the path elements that extend the prefix. + /// + /// # Returns + /// - `Some(suffix_bytes)` if descendant is a proper descendant of prefix + /// - `None` if the channels are equal (exact match, no suffix) + /// - `None` if the channels are not in a prefix relationship + /// + /// # Default + /// Returns `None` for most stores. Only PathMap stores implement this. + fn compute_suffix_key(&self, _prefix: &Self::Channel, _descendant: &Self::Channel) -> Option> { + None + } + + // ========================================================================= + // Allocation Mode (Array/Vector Index Support) + // ========================================================================= + + /// Get the allocation mode for `new` bindings within this space. + /// + /// Different storage types use different allocation strategies: + /// - **Random**: HashMap, PathMap, HashSet use cryptographic random IDs + /// - **ArrayIndex**: Array stores use sequential indices up to max_size + /// - **VectorIndex**: Vector stores use growing indices + /// + /// # Default + /// Returns `AllocationMode::Random` for most stores. + fn allocation_mode(&self) -> AllocationMode { + AllocationMode::Random + } +} + +// Module declarations +mod hashmap_store; +mod array_store; +mod vector_store; +mod pathmap_store; +mod rholang_pathmap_store; +mod hashset_store; +mod vectordb_store; + +// Re-export all store types +pub use hashmap_store::HashMapChannelStore; +pub use array_store::ArrayChannelStore; +pub use vector_store::VectorChannelStore; +pub use pathmap_store::PathMapChannelStore; +pub use rholang_pathmap_store::RholangPathMapStore; +pub use hashset_store::HashSetChannelStore; +pub use vectordb_store::VectorDBChannelStore; diff --git a/rholang/src/rust/interpreter/spaces/channel_store/pathmap_store.rs b/rholang/src/rust/interpreter/spaces/channel_store/pathmap_store.rs new file mode 100644 index 000000000..f1eb677b7 --- /dev/null +++ b/rholang/src/rust/interpreter/spaces/channel_store/pathmap_store.rs @@ -0,0 +1,919 @@ +//! PathMapChannelStore: PathMap-based channel storage with hierarchical prefix matching. + +use std::collections::HashMap; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::marker::PhantomData; + +use super::{ChannelStore, DataCollection, ContinuationCollection, SpaceId, SpaceError}; + +/// PathMap-based channel store with hierarchical prefix matching. +/// +/// Channels are represented as byte paths. Data sent on a specific path +/// is accessible to continuations listening on any prefix of that path. +/// For example, data on @[0,1,2] is accessible at @[0,1] and @[0]. +/// +/// This is designed for MeTTa integration where hierarchical namespacing +/// and prefix-based queries are common patterns. +/// +/// # Type Parameters +/// +/// - `P`: Pattern type for continuation matching +/// - `A`: Data type stored in channels +/// - `K`: Continuation type +/// - `DC`: Data collection type +/// - `CC`: Continuation collection type +#[derive(Debug)] +pub struct PathMapChannelStore +where + DC: Clone, + CC: Clone, +{ + /// Data collections indexed by path (as byte vector) + data: HashMap, DC>, + + /// Continuation collections indexed by channel pattern + continuations: HashMap>, CC>, + + /// Join patterns: path -> list of join patterns it participates in + joins: HashMap, Vec>>>, + + /// Counter for generating unique paths + gensym_counter: AtomicUsize, + + /// Factory function to create new data collections + data_factory: fn() -> DC, + + /// Factory function to create new continuation collections + cont_factory: fn() -> CC, + + /// PhantomData for unused type parameters + _phantom: PhantomData<(P, A, K)>, +} + +impl Clone for PathMapChannelStore +where + DC: Clone, + CC: Clone, +{ + fn clone(&self) -> Self { + PathMapChannelStore { + data: self.data.clone(), + continuations: self.continuations.clone(), + joins: self.joins.clone(), + gensym_counter: AtomicUsize::new(self.gensym_counter.load(Ordering::SeqCst)), + data_factory: self.data_factory, + cont_factory: self.cont_factory, + _phantom: PhantomData, + } + } +} + +impl PathMapChannelStore +where + P: Clone + Send + Sync, + A: Clone + Send + Sync + std::fmt::Debug + 'static, + K: Clone + Send + Sync, + DC: Clone + Send + Sync, + CC: Clone + Send + Sync, +{ + /// Create a new PathMap channel store with the given factory functions. + pub fn new(data_factory: fn() -> DC, cont_factory: fn() -> CC) -> Self { + PathMapChannelStore { + data: HashMap::new(), + continuations: HashMap::new(), + joins: HashMap::new(), + gensym_counter: AtomicUsize::new(0), + data_factory, + cont_factory, + _phantom: PhantomData, + } + } + + /// Get all paths that are prefixes of the given path (including the path itself). + /// For example, for path [0, 1, 2], returns [[0], [0, 1], [0, 1, 2]]. + pub fn prefixes(path: &[u8]) -> Vec> { + (1..=path.len()) + .map(|len| path[..len].to_vec()) + .collect() + } + + /// Get all paths in the store that the given path is a prefix of. + /// For example, if path is [0, 1], returns all stored paths starting with [0, 1]. + fn paths_with_prefix<'a>(&'a self, prefix: &[u8]) -> Vec<&'a Vec> { + self.data + .keys() + .filter(|path| path.starts_with(prefix)) + .collect() + } + + /// Create a normalized key for continuation lookup. + fn normalize_channels(channels: &[Vec]) -> Vec> { + let mut sorted = channels.to_vec(); + sorted.sort(); + sorted + } +} + +impl ChannelStore for PathMapChannelStore +where + P: Clone + Send + Sync, + A: Clone + Send + Sync + std::fmt::Debug + 'static, + K: Clone + Send + Sync, + DC: DataCollection + Default + Clone + Send + Sync + 'static, + CC: ContinuationCollection + Default + Clone + Send + Sync, +{ + type Channel = Vec; + type Pattern = P; + type Data = A; + type Continuation = K; + type DataColl = DC; + type ContColl = CC; + + fn get_or_create_data_collection(&mut self, channel: &Vec) -> &mut DC { + self.data + .entry(channel.clone()) + .or_insert_with(|| (self.data_factory)()) + } + + fn get_data_collection(&self, channel: &Vec) -> Option<&DC> { + self.data.get(channel) + } + + fn get_data_collection_mut(&mut self, channel: &Vec) -> Option<&mut DC> { + self.data.get_mut(channel) + } + + fn get_or_create_continuation_collection(&mut self, channels: &[Vec]) -> &mut CC { + let key = Self::normalize_channels(channels); + self.continuations + .entry(key) + .or_insert_with(|| (self.cont_factory)()) + } + + fn get_continuation_collection(&self, channels: &[Vec]) -> Option<&CC> { + let key = Self::normalize_channels(channels); + self.continuations.get(&key) + } + + fn get_continuation_collection_mut(&mut self, channels: &[Vec]) -> Option<&mut CC> { + let key = Self::normalize_channels(channels); + self.continuations.get_mut(&key) + } + + fn all_channels(&self) -> Vec<&Vec> { + self.data.keys().collect() + } + + fn gensym(&mut self, _space_id: &SpaceId) -> Result, SpaceError> { + let id = self.gensym_counter.fetch_add(1, Ordering::SeqCst); + // Generate a unique path based on the counter + Ok(id.to_be_bytes().to_vec()) + } + + fn get_joins(&self, channel: &Vec) -> Vec>> { + self.joins.get(channel).cloned().unwrap_or_default() + } + + fn put_join(&mut self, channels: Vec>) { + for channel in &channels { + self.joins + .entry(channel.clone()) + .or_insert_with(Vec::new) + .push(channels.clone()); + } + } + + fn remove_join(&mut self, channels: &[Vec]) { + for channel in channels { + if let Some(joins) = self.joins.get_mut(channel) { + joins.retain(|j| j != channels); + } + } + } + + fn snapshot(&self) -> Self { + Self { + data: self.data.clone(), + continuations: self.continuations.clone(), + joins: self.joins.clone(), + gensym_counter: AtomicUsize::new(self.gensym_counter.load(Ordering::SeqCst)), + data_factory: self.data_factory, + cont_factory: self.cont_factory, + _phantom: PhantomData, + } + } + + fn clear(&mut self) { + self.data.clear(); + self.continuations.clear(); + self.joins.clear(); + } + + fn is_empty(&self) -> bool { + self.data.is_empty() && self.continuations.is_empty() + } + + fn export_data(&self) -> Vec<(Self::Channel, Self::DataColl)> { + self.data + .iter() + .map(|(c, dc)| (c.clone(), dc.clone())) + .collect() + } + + fn export_continuations(&self) -> Vec<(Vec>, CC)> { + self.continuations + .iter() + .map(|(cs, cc)| (cs.clone(), cc.clone())) + .collect() + } + + fn export_joins(&self) -> Vec<(Vec, Vec>>)> { + self.joins + .iter() + .map(|(c, js)| (c.clone(), js.clone())) + .collect() + } + + fn for_each_data(&self, mut f: F) + where + F: FnMut(&Vec, &DC), + { + for (c, dc) in &self.data { + f(c, dc); + } + } + + fn for_each_continuation(&self, mut f: F) + where + F: FnMut(&[Vec], &CC), + { + for (cs, cc) in &self.continuations { + f(cs, cc); + } + } + + fn for_each_join(&self, mut f: F) + where + F: FnMut(&Vec, &[Vec>]), + { + for (c, js) in &self.joins { + f(c, js); + } + } + + fn gensym_counter(&self) -> usize { + self.gensym_counter.load(Ordering::SeqCst) + } + + fn import_data(&mut self, data: Vec<(Vec, DC)>) { + self.data.clear(); + for (channel, dc) in data { + self.data.insert(channel, dc); + } + } + + fn import_continuations(&mut self, continuations: Vec<(Vec>, CC)>) { + self.continuations.clear(); + for (channels, cc) in continuations { + self.continuations.insert(channels, cc); + } + } + + fn import_joins(&mut self, joins: Vec<(Vec, Vec>>)>) { + self.joins.clear(); + for (channel, join_patterns) in joins { + self.joins.insert(channel, join_patterns); + } + } + + fn set_gensym_counter(&mut self, counter: usize) { + self.gensym_counter.store(counter, Ordering::SeqCst); + } + + // ========================================================================= + // Prefix Semantics Implementation + // ========================================================================= + + fn supports_prefix_semantics(&self) -> bool { + true + } + + fn channels_with_prefix(&self, prefix: &Vec) -> Vec> { + self.data + .keys() + .filter(|path| path.starts_with(prefix)) + .cloned() + .collect() + } + + fn channel_prefixes(&self, channel: &Vec) -> Vec> { + Self::prefixes(channel) + } + + fn continuation_patterns_for_prefix(&self, channel: &Vec) -> Vec<(&Vec>, &CC)> { + let channel_prefixes = Self::prefixes(channel); + + self.continuations + .iter() + .filter(|(pattern_channels, _cc)| { + // Check if any channel in the pattern is a prefix of the given channel + pattern_channels.iter().any(|pattern_ch| { + channel_prefixes.contains(pattern_ch) + }) + }) + .collect() + } +} + +impl PathMapChannelStore +where + P: Clone + Send + Sync, + A: Clone + Send + Sync + std::fmt::Debug + 'static, + K: Clone + Send + Sync, + DC: super::DataCollection + Default + Clone + Send + Sync + 'static, + CC: Clone + Send + Sync, +{ + /// Get data from a channel and all its descendants (paths that have this path as prefix). + /// This enables hierarchical queries where a listener on @[0] can receive data from @[0,1,2]. + pub fn get_data_with_descendants(&self, prefix: &[u8]) -> Vec<(&Vec, &DC)> { + self.paths_with_prefix(prefix) + .into_iter() + .filter_map(|path| self.data.get(path).map(|dc| (path, dc))) + .collect() + } + + /// Get data from a channel and all its ancestors (prefixes of this path). + /// This enables hierarchical routing where data at @[0,1,2] is accessible at @[0,1]. + pub fn get_data_with_ancestors(&self, path: &[u8]) -> Vec<(Vec, &DC)> { + Self::prefixes(path) + .into_iter() + .filter_map(|prefix| self.data.get(&prefix).map(|dc| (prefix, dc))) + .collect() + } + + // ========================================================================= + // Quantale Operations (Lattice + Path Multiplication) + // ========================================================================= + // + // PathMap forms a quantale: a lattice with an associative multiplication. + // - Lattice join (∪): union of channel sets + // - Lattice meet (∩): intersection of channel sets + // - Subtraction (-): set difference + // - Multiplication (*): Cartesian product path concatenation + + /// Union of channel sets (lattice join). + /// + /// Returns a new store containing all channels from both stores. + /// Data from self takes precedence for channels present in both. + pub fn union_channels(&self, other: &Self) -> Self { + let mut result = self.clone(); + + // Add all channels from other that aren't in self + for (path, dc) in &other.data { + result.data.entry(path.clone()).or_insert_with(|| dc.clone()); + } + + // Merge continuations + for (key, cc) in &other.continuations { + result.continuations.entry(key.clone()).or_insert_with(|| cc.clone()); + } + + // Merge joins + for (path, joins) in &other.joins { + result.joins + .entry(path.clone()) + .or_insert_with(Vec::new) + .extend(joins.clone()); + } + + result + } + + /// Intersection of channel sets (lattice meet). + /// + /// Returns a new store containing only channels present in both stores. + /// Data from self is used for channels in the intersection. + pub fn intersect_channels(&self, other: &Self) -> Self { + let mut result = PathMapChannelStore::new(self.data_factory, self.cont_factory); + + // Only include channels that are in both + for (path, dc) in &self.data { + if other.data.contains_key(path) { + result.data.insert(path.clone(), dc.clone()); + } + } + + // Only include continuation patterns where all channels are in intersection + for (key, cc) in &self.continuations { + let all_in_intersection = key.iter().all(|ch| other.data.contains_key(ch)); + if all_in_intersection { + result.continuations.insert(key.clone(), cc.clone()); + } + } + + result + } + + /// Subtraction of channel sets (distributive lattice operation). + /// + /// Returns a new store containing channels in self but not in other. + pub fn subtract_channels(&self, other: &Self) -> Self { + let mut result = PathMapChannelStore::new(self.data_factory, self.cont_factory); + + // Include channels that are in self but not in other + for (path, dc) in &self.data { + if !other.data.contains_key(path) { + result.data.insert(path.clone(), dc.clone()); + } + } + + // Only include continuation patterns where no channel is in other + for (key, cc) in &self.continuations { + let none_in_other = key.iter().all(|ch| !other.data.contains_key(ch)); + if none_in_other { + result.continuations.insert(key.clone(), cc.clone()); + } + } + + // Preserve joins for remaining channels + for (path, joins) in &self.joins { + if result.data.contains_key(path) { + result.joins.insert(path.clone(), joins.clone()); + } + } + + result + } + + /// Path multiplication (quantale multiplication). + /// + /// Returns a new store with paths formed by concatenating every path in self + /// with every path in other: {a,b} * {c,d} = {ac, ad, bc, bd}. + /// + /// This operation is used for hierarchical namespace composition. + /// + /// # Performance Note + /// + /// This eagerly computes the full Cartesian product O(n₁ × n₂). + /// For lazy evaluation, use [`concat_paths_iter`] instead. + pub fn concat_paths(&self, other: &Self) -> Self { + let mut result = PathMapChannelStore::new(self.data_factory, self.cont_factory); + + // Cartesian product of paths + for (path1, dc1) in &self.data { + for (path2, _dc2) in &other.data { + let mut concatenated = path1.clone(); + concatenated.extend(path2); + // Use data from self for the concatenated path + result.data.insert(concatenated, dc1.clone()); + } + } + + result + } + + /// Lazy path multiplication iterator (quantale multiplication). + /// + /// Returns an iterator that yields concatenated paths on-demand without + /// eagerly computing the full Cartesian product. Each iteration produces + /// `(concatenated_path, data_collection_reference)`. + /// + /// # Performance + /// + /// - Memory: O(1) per iteration (no intermediate storage) + /// - Time per iteration: O(|path1| + |path2|) for path cloning + /// - Total time if fully consumed: O(n₁ × n₂ × avg_path_len) + /// + /// Use this when you need only a subset of the product or want to + /// process results in a streaming fashion. + /// + /// # Example + /// + /// ```ignore + /// // Find first matching concatenation + /// let match = store1.concat_paths_iter(&store2) + /// .find(|(path, _)| path.starts_with(&prefix)); + /// ``` + pub fn concat_paths_iter<'a>( + &'a self, + other: &'a Self, + ) -> impl Iterator, &'a DC)> + 'a { + self.data.iter().flat_map(move |(path1, dc1)| { + other.data.keys().map(move |path2| { + let mut concatenated = path1.clone(); + concatenated.extend(path2); + (concatenated, dc1) + }) + }) + } + + /// Lazy path multiplication with both data collections. + /// + /// Similar to [`concat_paths_iter`] but yields references to both + /// data collections, useful when you need data from both stores. + pub fn concat_paths_iter_full<'a>( + &'a self, + other: &'a Self, + ) -> impl Iterator, &'a DC, &'a DC)> + 'a { + self.data.iter().flat_map(move |(path1, dc1)| { + other.data.iter().map(move |(path2, dc2)| { + let mut concatenated = path1.clone(); + concatenated.extend(path2); + (concatenated, dc1, dc2) + }) + }) + } + + /// Get all paths currently in the store. + pub fn all_paths(&self) -> Vec<&Vec> { + self.data.keys().collect() + } + + /// Check if a path exists in the store. + pub fn contains_path(&self, path: &[u8]) -> bool { + self.data.contains_key(path) + } + + /// Get the number of distinct paths in the store. + pub fn path_count(&self) -> usize { + self.data.len() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::rust::interpreter::spaces::collections::{BagDataCollection, BagContinuationCollection, ContinuationCollection, DataCollection}; + + #[test] + fn test_pathmap_store_basic() { + let mut store: PathMapChannelStore, BagContinuationCollection> = + PathMapChannelStore::new( + BagDataCollection::new, + BagContinuationCollection::new, + ); + + let path = vec![0u8, 1, 2]; + let dc = store.get_or_create_data_collection(&path); + dc.put(100).expect("put should succeed"); + + assert_eq!(store.get_data_collection(&path).expect("collection should exist").len(), 1); + } + + #[test] + fn test_pathmap_gensym() { + let mut store: PathMapChannelStore, BagContinuationCollection> = + PathMapChannelStore::new( + BagDataCollection::new, + BagContinuationCollection::new, + ); + + let space_id = SpaceId::default_space(); + let c1 = store.gensym(&space_id).expect("gensym should succeed"); + let c2 = store.gensym(&space_id).expect("gensym should succeed"); + let c3 = store.gensym(&space_id).expect("gensym should succeed"); + + // Each gensym should produce a unique path + assert_ne!(c1, c2); + assert_ne!(c2, c3); + assert_ne!(c1, c3); + } + + #[test] + fn test_pathmap_prefixes() { + let path = vec![0u8, 1, 2]; + let prefixes = PathMapChannelStore::, BagContinuationCollection>::prefixes(&path); + + assert_eq!(prefixes.len(), 3); + assert_eq!(prefixes[0], vec![0u8]); + assert_eq!(prefixes[1], vec![0u8, 1]); + assert_eq!(prefixes[2], vec![0u8, 1, 2]); + } + + #[test] + fn test_pathmap_hierarchical_queries() { + let mut store: PathMapChannelStore, BagContinuationCollection> = + PathMapChannelStore::new( + BagDataCollection::new, + BagContinuationCollection::new, + ); + + // Store data at different paths + store.get_or_create_data_collection(&vec![0u8]).put(10).expect("put should succeed"); + store.get_or_create_data_collection(&vec![0u8, 1]).put(20).expect("put should succeed"); + store.get_or_create_data_collection(&vec![0u8, 1, 2]).put(30).expect("put should succeed"); + store.get_or_create_data_collection(&vec![1u8]).put(100).expect("put should succeed"); + + // Get all descendants of [0] + let descendants = store.get_data_with_descendants(&[0u8]); + assert_eq!(descendants.len(), 3); // [0], [0,1], [0,1,2] + + // Get all ancestors of [0,1,2] + let ancestors = store.get_data_with_ancestors(&[0u8, 1, 2]); + assert_eq!(ancestors.len(), 3); // [0], [0,1], [0,1,2] + + // Get descendants of [1] + let descendants = store.get_data_with_descendants(&[1u8]); + assert_eq!(descendants.len(), 1); // only [1] + } + + #[test] + fn test_pathmap_join_patterns() { + let mut store: PathMapChannelStore, BagContinuationCollection> = + PathMapChannelStore::new( + BagDataCollection::new, + BagContinuationCollection::new, + ); + + let path1 = vec![0u8, 1]; + let path2 = vec![0u8, 2]; + let path3 = vec![1u8]; + + store.put_join(vec![path1.clone(), path2.clone()]); + store.put_join(vec![path1.clone(), path3.clone()]); + + let joins = store.get_joins(&path1); + assert_eq!(joins.len(), 2); + + store.remove_join(&[path1.clone(), path2.clone()]); + let joins = store.get_joins(&path1); + assert_eq!(joins.len(), 1); + } + + #[test] + fn test_pathmap_union_channels() { + let mut store1: PathMapChannelStore, BagContinuationCollection> = + PathMapChannelStore::new(BagDataCollection::new, BagContinuationCollection::new); + let mut store2: PathMapChannelStore, BagContinuationCollection> = + PathMapChannelStore::new(BagDataCollection::new, BagContinuationCollection::new); + + store1.get_or_create_data_collection(&vec![0u8, 1]).put(10).expect("put"); + store1.get_or_create_data_collection(&vec![0u8, 2]).put(20).expect("put"); + store2.get_or_create_data_collection(&vec![0u8, 2]).put(99).expect("put"); // overlap + store2.get_or_create_data_collection(&vec![1u8, 0]).put(30).expect("put"); + + let union = store1.union_channels(&store2); + + assert_eq!(union.path_count(), 3); // [0,1], [0,2], [1,0] + assert!(union.contains_path(&[0u8, 1])); + assert!(union.contains_path(&[0u8, 2])); + assert!(union.contains_path(&[1u8, 0])); + + // [0,2] should have store1's data (10 from store1 takes precedence) + let dc = union.get_data_collection(&vec![0u8, 2]).expect("should exist"); + assert_eq!(dc.len(), 1); + } + + #[test] + fn test_pathmap_intersect_channels() { + let mut store1: PathMapChannelStore, BagContinuationCollection> = + PathMapChannelStore::new(BagDataCollection::new, BagContinuationCollection::new); + let mut store2: PathMapChannelStore, BagContinuationCollection> = + PathMapChannelStore::new(BagDataCollection::new, BagContinuationCollection::new); + + store1.get_or_create_data_collection(&vec![0u8, 1]).put(10).expect("put"); + store1.get_or_create_data_collection(&vec![0u8, 2]).put(20).expect("put"); + store2.get_or_create_data_collection(&vec![0u8, 2]).put(99).expect("put"); + store2.get_or_create_data_collection(&vec![1u8, 0]).put(30).expect("put"); + + let intersection = store1.intersect_channels(&store2); + + assert_eq!(intersection.path_count(), 1); // only [0,2] + assert!(!intersection.contains_path(&[0u8, 1])); // only in store1 + assert!(intersection.contains_path(&[0u8, 2])); // in both + assert!(!intersection.contains_path(&[1u8, 0])); // only in store2 + } + + #[test] + fn test_pathmap_subtract_channels() { + let mut store1: PathMapChannelStore, BagContinuationCollection> = + PathMapChannelStore::new(BagDataCollection::new, BagContinuationCollection::new); + let mut store2: PathMapChannelStore, BagContinuationCollection> = + PathMapChannelStore::new(BagDataCollection::new, BagContinuationCollection::new); + + store1.get_or_create_data_collection(&vec![0u8, 1]).put(10).expect("put"); + store1.get_or_create_data_collection(&vec![0u8, 2]).put(20).expect("put"); + store1.get_or_create_data_collection(&vec![0u8, 3]).put(30).expect("put"); + store2.get_or_create_data_collection(&vec![0u8, 2]).put(99).expect("put"); + + let difference = store1.subtract_channels(&store2); + + assert_eq!(difference.path_count(), 2); // [0,1] and [0,3] + assert!(difference.contains_path(&[0u8, 1])); + assert!(!difference.contains_path(&[0u8, 2])); // subtracted out + assert!(difference.contains_path(&[0u8, 3])); + } + + #[test] + fn test_pathmap_concat_paths() { + let mut store1: PathMapChannelStore, BagContinuationCollection> = + PathMapChannelStore::new(BagDataCollection::new, BagContinuationCollection::new); + let mut store2: PathMapChannelStore, BagContinuationCollection> = + PathMapChannelStore::new(BagDataCollection::new, BagContinuationCollection::new); + + store1.get_or_create_data_collection(&vec![0u8]).put(1).expect("put"); + store1.get_or_create_data_collection(&vec![1u8]).put(2).expect("put"); + store2.get_or_create_data_collection(&vec![2u8]).put(3).expect("put"); + store2.get_or_create_data_collection(&vec![3u8]).put(4).expect("put"); + + let product = store1.concat_paths(&store2); + + // {[0], [1]} * {[2], [3]} = {[0,2], [0,3], [1,2], [1,3]} + assert_eq!(product.path_count(), 4); + assert!(product.contains_path(&[0u8, 2])); + assert!(product.contains_path(&[0u8, 3])); + assert!(product.contains_path(&[1u8, 2])); + assert!(product.contains_path(&[1u8, 3])); + } + + #[test] + fn test_pathmap_concat_paths_iter_lazy() { + // Test that lazy iterator produces same results as eager version + let mut store1: PathMapChannelStore, BagContinuationCollection> = + PathMapChannelStore::new(BagDataCollection::new, BagContinuationCollection::new); + let mut store2: PathMapChannelStore, BagContinuationCollection> = + PathMapChannelStore::new(BagDataCollection::new, BagContinuationCollection::new); + + store1.get_or_create_data_collection(&vec![0u8]).put(1).expect("put"); + store1.get_or_create_data_collection(&vec![1u8]).put(2).expect("put"); + store2.get_or_create_data_collection(&vec![2u8]).put(3).expect("put"); + store2.get_or_create_data_collection(&vec![3u8]).put(4).expect("put"); + + // Collect lazy iterator results + let lazy_results: Vec<_> = store1.concat_paths_iter(&store2) + .map(|(path, _dc)| path) + .collect(); + + // Should produce same 4 paths as eager version + assert_eq!(lazy_results.len(), 4); + assert!(lazy_results.contains(&vec![0u8, 2])); + assert!(lazy_results.contains(&vec![0u8, 3])); + assert!(lazy_results.contains(&vec![1u8, 2])); + assert!(lazy_results.contains(&vec![1u8, 3])); + } + + #[test] + fn test_pathmap_concat_paths_iter_early_termination() { + // Test that lazy iterator allows early termination without computing all + let mut store1: PathMapChannelStore, BagContinuationCollection> = + PathMapChannelStore::new(BagDataCollection::new, BagContinuationCollection::new); + let mut store2: PathMapChannelStore, BagContinuationCollection> = + PathMapChannelStore::new(BagDataCollection::new, BagContinuationCollection::new); + + // Create larger stores + for i in 0..10u8 { + store1.get_or_create_data_collection(&vec![i]).put(i as i32).expect("put"); + store2.get_or_create_data_collection(&vec![i + 10]).put(i as i32).expect("put"); + } + + // Find first path starting with [5] (should not compute all 100 combinations) + let found = store1.concat_paths_iter(&store2) + .find(|(path, _)| path.starts_with(&[5u8])); + + assert!(found.is_some()); + assert!(found.unwrap().0.starts_with(&[5u8])); + } + + #[test] + fn test_pathmap_quantale_associativity() { + // Test that path multiplication is associative: (A * B) * C == A * (B * C) + let mut a: PathMapChannelStore, BagContinuationCollection> = + PathMapChannelStore::new(BagDataCollection::new, BagContinuationCollection::new); + let mut b: PathMapChannelStore, BagContinuationCollection> = + PathMapChannelStore::new(BagDataCollection::new, BagContinuationCollection::new); + let mut c: PathMapChannelStore, BagContinuationCollection> = + PathMapChannelStore::new(BagDataCollection::new, BagContinuationCollection::new); + + a.get_or_create_data_collection(&vec![1u8]).put(1).expect("put"); + b.get_or_create_data_collection(&vec![2u8]).put(2).expect("put"); + c.get_or_create_data_collection(&vec![3u8]).put(3).expect("put"); + + let ab_c = a.concat_paths(&b).concat_paths(&c); + let a_bc = a.concat_paths(&b.concat_paths(&c)); + + // Both should produce the same paths + let mut paths1: Vec<_> = ab_c.all_paths().into_iter().cloned().collect(); + let mut paths2: Vec<_> = a_bc.all_paths().into_iter().cloned().collect(); + paths1.sort(); + paths2.sort(); + assert_eq!(paths1, paths2); + assert!(ab_c.contains_path(&[1u8, 2, 3])); + } + + // ========================================================================== + // PathMap Prefix Semantics Tests + // ========================================================================== + + #[test] + fn test_pathmap_supports_prefix_semantics() { + let store: PathMapChannelStore, BagContinuationCollection> = + PathMapChannelStore::new(BagDataCollection::new, BagContinuationCollection::new); + + assert!(store.supports_prefix_semantics()); + } + + #[test] + fn test_pathmap_channels_with_prefix() { + let mut store: PathMapChannelStore, BagContinuationCollection> = + PathMapChannelStore::new(BagDataCollection::new, BagContinuationCollection::new); + + store.get_or_create_data_collection(&vec![0u8, 1]); + store.get_or_create_data_collection(&vec![0u8, 1, 2]); + store.get_or_create_data_collection(&vec![0u8, 1, 3]); + store.get_or_create_data_collection(&vec![0u8, 2]); + store.get_or_create_data_collection(&vec![1u8]); + + let mut descendants = store.channels_with_prefix(&vec![0u8, 1]); + descendants.sort(); + + assert_eq!(descendants.len(), 3); + assert_eq!(descendants[0], vec![0u8, 1]); + assert_eq!(descendants[1], vec![0u8, 1, 2]); + assert_eq!(descendants[2], vec![0u8, 1, 3]); + } + + #[test] + fn test_pathmap_channel_prefixes() { + let store: PathMapChannelStore, BagContinuationCollection> = + PathMapChannelStore::new(BagDataCollection::new, BagContinuationCollection::new); + + let prefixes = store.channel_prefixes(&vec![0u8, 1, 2]); + + assert_eq!(prefixes.len(), 3); + assert_eq!(prefixes[0], vec![0u8]); + assert_eq!(prefixes[1], vec![0u8, 1]); + assert_eq!(prefixes[2], vec![0u8, 1, 2]); + } + + #[test] + fn test_pathmap_continuation_patterns_for_prefix() { + let mut store: PathMapChannelStore, BagContinuationCollection> = + PathMapChannelStore::new(BagDataCollection::new, BagContinuationCollection::new); + + // Add continuations at various paths + // Continuation at @[0, 1] - a prefix of @[0, 1, 2] + store.get_or_create_continuation_collection(&[vec![0u8, 1]]) + .put(vec!["pattern1".to_string()], "cont1".to_string(), false); + + // Continuation at @[0] - a shorter prefix of @[0, 1, 2] + store.get_or_create_continuation_collection(&[vec![0u8]]) + .put(vec!["pattern2".to_string()], "cont2".to_string(), false); + + // Continuation at @[1] - NOT a prefix of @[0, 1, 2] + store.get_or_create_continuation_collection(&[vec![1u8]]) + .put(vec!["pattern3".to_string()], "cont3".to_string(), false); + + // Find continuations that are at prefixes of @[0, 1, 2] + let matches = store.continuation_patterns_for_prefix(&vec![0u8, 1, 2]); + + assert_eq!(matches.len(), 2); + + // Verify we found the right continuations + let patterns: Vec<_> = matches.iter().map(|(p, _)| p.clone()).collect(); + assert!(patterns.contains(&&vec![vec![0u8, 1]])); + assert!(patterns.contains(&&vec![vec![0u8]])); + } + + #[test] + fn test_pathmap_prefix_semantics_spec_example() { + // From spec lines 159-192: + // @[0, 1, 2]!({|"hi"|}) | @[0, 1, 2]!({|"hello"|}) | @[0, 1, 3]!({|"there"|}) + // = @[0, 1]!({|[2, "hi"], [2, "hello"], [3, "there"]|}) + + let mut store: PathMapChannelStore, BagContinuationCollection> = + PathMapChannelStore::new(BagDataCollection::new, BagContinuationCollection::new); + + // Store data at specific paths + store.get_or_create_data_collection(&vec![0u8, 1, 2]).put("hi".to_string()).expect("put"); + store.get_or_create_data_collection(&vec![0u8, 1, 2]).put("hello".to_string()).expect("put"); + store.get_or_create_data_collection(&vec![0u8, 1, 3]).put("there".to_string()).expect("put"); + + // Add a continuation waiting at the prefix @[0, 1] + store.get_or_create_continuation_collection(&[vec![0u8, 1]]) + .put(vec!["*".to_string()], "consumer".to_string(), false); + + // The continuation at @[0, 1] should be found when producing on @[0, 1, 2] or @[0, 1, 3] + let matches_for_012 = store.continuation_patterns_for_prefix(&vec![0u8, 1, 2]); + assert_eq!(matches_for_012.len(), 1); + + let matches_for_013 = store.continuation_patterns_for_prefix(&vec![0u8, 1, 3]); + assert_eq!(matches_for_013.len(), 1); + + // Get all descendants of @[0, 1] - should include data from @[0, 1, 2] and @[0, 1, 3] + let descendants = store.get_data_with_descendants(&[0u8, 1]); + assert_eq!(descendants.len(), 2); // @[0, 1, 2] and @[0, 1, 3] + } + + #[test] + fn test_pathmap_allocation_mode_returns_random() { + use crate::rust::interpreter::spaces::types::AllocationMode; + + let store: PathMapChannelStore, BagContinuationCollection> = + PathMapChannelStore::new( + BagDataCollection::new, + BagContinuationCollection::new, + ); + + assert_eq!(store.allocation_mode(), AllocationMode::Random); + } +} diff --git a/rholang/src/rust/interpreter/spaces/channel_store/rholang_pathmap_store.rs b/rholang/src/rust/interpreter/spaces/channel_store/rholang_pathmap_store.rs new file mode 100644 index 000000000..0b71fcc5a --- /dev/null +++ b/rholang/src/rust/interpreter/spaces/channel_store/rholang_pathmap_store.rs @@ -0,0 +1,448 @@ +//! RholangPathMapStore: PathMap-aware channel store for Rholang with Par channels. + +use std::collections::HashMap; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::marker::PhantomData; + +use models::rhoapi::Par; + +use super::{ChannelStore, DataCollection, ContinuationCollection, SpaceId, SpaceError}; +use super::super::types::{par_to_path, path_to_par, get_path_suffix, path_element_boundaries}; + +/// PathMap-aware channel store for Rholang that uses Par as channel type. +/// +/// This store enables PathMap prefix semantics with Rholang's Par-based channels. +/// It maintains an internal mapping from path bytes to Par channels, allowing +/// prefix-based queries on channels like `@[0, 1, 2]`. +/// +/// # Type Parameters +/// +/// - `P`: Pattern type for continuation matching +/// - `A`: Data type stored in channels +/// - `K`: Continuation type +/// - `DC`: Data collection type +/// - `CC`: Continuation collection type +/// +/// # Prefix Semantics +/// +/// When a Par channel represents a path (EList of integers 0-255): +/// - `produce(@[0,1,2], data)` stores data at path `[0, 1, 2]` +/// - `consume(@[0,1], pattern)` can receive data from `@[0,1,2]` with suffix `[2]` +/// +/// For non-path channels, the store falls back to exact matching (like HashMap). +/// +/// # Example +/// ```ignore +/// let store = RholangPathMapStore::new(BagDataCollection::new, BagContinuationCollection::new); +/// +/// // Path channel: prefix semantics enabled +/// let path_channel = path_to_par(&[0, 1, 2]); +/// +/// // Non-path channel: exact matching only +/// let string_channel = create_string_par("hello"); +/// ``` +#[derive(Debug)] +pub struct RholangPathMapStore +where + DC: Clone, + CC: Clone, +{ + /// Data collections indexed by Par channel + data: HashMap, + + /// Continuation collections indexed by channel pattern + continuations: HashMap, CC>, + + /// Join patterns: channel -> list of join patterns it participates in + joins: HashMap>>, + + /// Path index: maps path bytes to their Par representation for prefix queries + /// Only populated for channels that are valid paths (EList of ints) + path_index: HashMap, Par>, + + /// Counter for generating unique channel names + gensym_counter: AtomicUsize, + + /// Factory function to create new data collections + data_factory: fn() -> DC, + + /// Factory function to create new continuation collections + cont_factory: fn() -> CC, + + /// PhantomData for unused type parameters + _phantom: PhantomData<(P, A, K)>, +} + +impl Clone for RholangPathMapStore +where + DC: Clone, + CC: Clone, +{ + fn clone(&self) -> Self { + RholangPathMapStore { + data: self.data.clone(), + continuations: self.continuations.clone(), + joins: self.joins.clone(), + path_index: self.path_index.clone(), + gensym_counter: AtomicUsize::new(self.gensym_counter.load(Ordering::SeqCst)), + data_factory: self.data_factory, + cont_factory: self.cont_factory, + _phantom: PhantomData, + } + } +} + +impl RholangPathMapStore +where + P: Clone + Send + Sync, + A: Clone + Send + Sync + std::fmt::Debug + 'static, + K: Clone + Send + Sync, + DC: Clone + Send + Sync, + CC: Clone + Send + Sync, +{ + /// Create a new Rholang PathMap store with the given factory functions. + pub fn new(data_factory: fn() -> DC, cont_factory: fn() -> CC) -> Self { + RholangPathMapStore { + data: HashMap::new(), + continuations: HashMap::new(), + joins: HashMap::new(), + path_index: HashMap::new(), + gensym_counter: AtomicUsize::new(0), + data_factory, + cont_factory, + _phantom: PhantomData, + } + } + + /// Update the path index when adding a channel. + fn index_channel(&mut self, channel: &Par) { + if let Some(path) = par_to_path(channel) { + self.path_index.insert(path, channel.clone()); + } + } + + /// Remove from path index when removing a channel. + #[allow(dead_code)] + fn unindex_channel(&mut self, channel: &Par) { + if let Some(path) = par_to_path(channel) { + self.path_index.remove(&path); + } + } + + /// Get all Par channels whose paths have the given prefix. + /// + /// Returns channels where `par_to_path(channel)` starts with `prefix_path`. + pub fn channels_with_path_prefix(&self, prefix_path: &[u8]) -> Vec { + self.path_index + .iter() + .filter(|(path, _)| path.starts_with(prefix_path)) + .map(|(_, par)| par.clone()) + .collect() + } +} + +impl ChannelStore for RholangPathMapStore +where + P: Clone + Send + Sync, + A: Clone + Send + Sync + std::fmt::Debug + 'static, + K: Clone + Send + Sync, + DC: DataCollection + Default + Clone + Send + Sync + 'static, + CC: ContinuationCollection + Default + Clone + Send + Sync, +{ + type Channel = Par; + type Pattern = P; + type Data = A; + type Continuation = K; + type DataColl = DC; + type ContColl = CC; + + fn get_or_create_data_collection(&mut self, channel: &Par) -> &mut DC { + // Index the channel if it's a path + self.index_channel(channel); + + self.data + .entry(channel.clone()) + .or_insert_with(|| (self.data_factory)()) + } + + fn get_data_collection(&self, channel: &Par) -> Option<&DC> { + self.data.get(channel) + } + + fn get_data_collection_mut(&mut self, channel: &Par) -> Option<&mut DC> { + self.data.get_mut(channel) + } + + fn get_or_create_continuation_collection(&mut self, channels: &[Par]) -> &mut CC { + let mut sorted = channels.to_vec(); + sorted.sort_by(|a, b| format!("{:?}", a).cmp(&format!("{:?}", b))); + self.continuations + .entry(sorted) + .or_insert_with(|| (self.cont_factory)()) + } + + fn get_continuation_collection(&self, channels: &[Par]) -> Option<&CC> { + let mut sorted = channels.to_vec(); + sorted.sort_by(|a, b| format!("{:?}", a).cmp(&format!("{:?}", b))); + self.continuations.get(&sorted) + } + + fn get_continuation_collection_mut(&mut self, channels: &[Par]) -> Option<&mut CC> { + let mut sorted = channels.to_vec(); + sorted.sort_by(|a, b| format!("{:?}", a).cmp(&format!("{:?}", b))); + self.continuations.get_mut(&sorted) + } + + fn all_channels(&self) -> Vec<&Par> { + self.data.keys().collect() + } + + fn gensym(&mut self, _space_id: &SpaceId) -> Result { + let id = self.gensym_counter.fetch_add(1, Ordering::SeqCst); + // Generate a unique path based on the counter + let path = id.to_be_bytes().to_vec(); + Ok(path_to_par(&path)) + } + + fn get_joins(&self, channel: &Par) -> Vec> { + self.joins.get(channel).cloned().unwrap_or_default() + } + + fn put_join(&mut self, channels: Vec) { + for channel in &channels { + self.joins + .entry(channel.clone()) + .or_insert_with(Vec::new) + .push(channels.clone()); + } + } + + fn remove_join(&mut self, channels: &[Par]) { + for channel in channels { + if let Some(joins) = self.joins.get_mut(channel) { + joins.retain(|j| j != channels); + } + } + } + + fn snapshot(&self) -> Self { + self.clone() + } + + fn clear(&mut self) { + self.data.clear(); + self.continuations.clear(); + self.joins.clear(); + self.path_index.clear(); + } + + fn is_empty(&self) -> bool { + self.data.is_empty() && self.continuations.is_empty() + } + + fn export_data(&self) -> Vec<(Par, DC)> { + self.data + .iter() + .map(|(c, dc)| (c.clone(), dc.clone())) + .collect() + } + + fn export_continuations(&self) -> Vec<(Vec, CC)> { + self.continuations + .iter() + .map(|(cs, cc)| (cs.clone(), cc.clone())) + .collect() + } + + fn export_joins(&self) -> Vec<(Par, Vec>)> { + self.joins + .iter() + .map(|(c, js)| (c.clone(), js.clone())) + .collect() + } + + fn for_each_data(&self, mut f: F) + where + F: FnMut(&Par, &DC), + { + for (c, dc) in &self.data { + f(c, dc); + } + } + + fn for_each_continuation(&self, mut f: F) + where + F: FnMut(&[Par], &CC), + { + for (cs, cc) in &self.continuations { + f(cs, cc); + } + } + + fn for_each_join(&self, mut f: F) + where + F: FnMut(&Par, &[Vec]), + { + for (c, js) in &self.joins { + f(c, js); + } + } + + fn gensym_counter(&self) -> usize { + self.gensym_counter.load(Ordering::SeqCst) + } + + fn import_data(&mut self, data: Vec<(Par, DC)>) { + self.data.clear(); + self.path_index.clear(); + for (channel, dc) in data { + self.index_channel(&channel); + self.data.insert(channel, dc); + } + } + + fn import_continuations(&mut self, continuations: Vec<(Vec, CC)>) { + self.continuations.clear(); + for (channels, cc) in continuations { + self.continuations.insert(channels, cc); + } + } + + fn import_joins(&mut self, joins: Vec<(Par, Vec>)>) { + self.joins.clear(); + for (channel, join_patterns) in joins { + self.joins.insert(channel, join_patterns); + } + } + + fn set_gensym_counter(&mut self, counter: usize) { + self.gensym_counter.store(counter, Ordering::SeqCst); + } + + // ========================================================================= + // Prefix Semantics Implementation + // ========================================================================= + + fn supports_prefix_semantics(&self) -> bool { + true + } + + fn channels_with_prefix(&self, prefix: &Par) -> Vec { + // Convert the prefix Par to a path + if let Some(prefix_path) = par_to_path(prefix) { + self.channels_with_path_prefix(&prefix_path) + } else { + // For non-path channels, only exact match + if self.data.contains_key(prefix) { + vec![prefix.clone()] + } else { + vec![] + } + } + } + + fn channel_prefixes(&self, channel: &Par) -> Vec { + // Convert the channel Par to a path + if let Some(path) = par_to_path(channel) { + // Get element boundaries in the tagged path encoding + let boundaries = path_element_boundaries(&path); + + // Generate prefix Pars at each element boundary + // For path @[0, 1, 2] (27 bytes), boundaries = [9, 18, 27] + // This generates: @[0] (bytes 0..9), @[0,1] (bytes 0..18), @[0,1,2] (bytes 0..27) + boundaries + .iter() + .map(|&boundary| path_to_par(&path[..boundary])) + .collect() + } else { + // For non-path channels, only return the channel itself + vec![channel.clone()] + } + } + + fn compute_suffix_key(&self, prefix: &Par, descendant: &Par) -> Option> { + // Convert both channels to paths + let prefix_path = par_to_path(prefix)?; + let descendant_path = par_to_path(descendant)?; + + // Get the suffix - returns None if not a prefix relationship + get_path_suffix(&prefix_path, &descendant_path) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::rust::interpreter::spaces::collections::{BagDataCollection, BagContinuationCollection}; + + #[test] + fn test_rholang_pathmap_store_basic() { + let mut store: RholangPathMapStore, BagContinuationCollection> = + RholangPathMapStore::new( + BagDataCollection::new, + BagContinuationCollection::new, + ); + + let channel = path_to_par(&[0, 1, 2]); + let dc = store.get_or_create_data_collection(&channel); + dc.put(100).expect("put should succeed"); + + assert_eq!(store.get_data_collection(&channel).expect("collection should exist").len(), 1); + } + + #[test] + fn test_rholang_pathmap_gensym() { + let mut store: RholangPathMapStore, BagContinuationCollection> = + RholangPathMapStore::new( + BagDataCollection::new, + BagContinuationCollection::new, + ); + + let space_id = SpaceId::default_space(); + let c1 = store.gensym(&space_id).expect("gensym should succeed"); + let c2 = store.gensym(&space_id).expect("gensym should succeed"); + let c3 = store.gensym(&space_id).expect("gensym should succeed"); + + // Each gensym should produce a unique channel + assert_ne!(c1, c2); + assert_ne!(c2, c3); + assert_ne!(c1, c3); + } + + #[test] + fn test_rholang_pathmap_supports_prefix_semantics() { + let store: RholangPathMapStore, BagContinuationCollection> = + RholangPathMapStore::new(BagDataCollection::new, BagContinuationCollection::new); + + assert!(store.supports_prefix_semantics()); + } + + #[test] + fn test_rholang_pathmap_path_indexing() { + let mut store: RholangPathMapStore, BagContinuationCollection> = + RholangPathMapStore::new( + BagDataCollection::new, + BagContinuationCollection::new, + ); + + // Create channels + let channel_0_1 = path_to_par(&[0, 1]); + let channel_0_1_2 = path_to_par(&[0, 1, 2]); + let channel_0_1_3 = path_to_par(&[0, 1, 3]); + let channel_0_2 = path_to_par(&[0, 2]); + + store.get_or_create_data_collection(&channel_0_1); + store.get_or_create_data_collection(&channel_0_1_2); + store.get_or_create_data_collection(&channel_0_1_3); + store.get_or_create_data_collection(&channel_0_2); + + // Get channels with prefix @[0, 1] + let prefix = path_to_par(&[0, 1]); + let prefix_path = par_to_path(&prefix).expect("should be valid path"); + let mut descendants = store.channels_with_path_prefix(&prefix_path); + descendants.sort_by(|a, b| format!("{:?}", a).cmp(&format!("{:?}", b))); + + // Should have @[0,1], @[0,1,2], @[0,1,3] + assert_eq!(descendants.len(), 3); + } +} diff --git a/rholang/src/rust/interpreter/spaces/channel_store/vector_store.rs b/rholang/src/rust/interpreter/spaces/channel_store/vector_store.rs new file mode 100644 index 000000000..952b6dc66 --- /dev/null +++ b/rholang/src/rust/interpreter/spaces/channel_store/vector_store.rs @@ -0,0 +1,442 @@ +//! VectorChannelStore: Dynamic indexed channel storage with unbounded growth. + +use std::collections::HashMap; +use std::hash::Hash; +use std::marker::PhantomData; + +use super::{ChannelStore, DataCollection, ContinuationCollection, SpaceId, SpaceError, AllocationMode}; + +/// Vector-based channel store with unbounded growth. +/// +/// Similar to Array but grows automatically without size limits. +/// Generic over channel type C for integration with Rholang spaces. +/// +/// # Type Parameters +/// +/// - `C`: Channel type (must support hashing and equality) +/// - `P`: Pattern type for continuation matching +/// - `A`: Data type stored in channels +/// - `K`: Continuation type +/// - `DC`: Data collection type +/// - `CC`: Continuation collection type +/// +/// # Design Document Alignment +/// +/// Per the Reifying RSpaces design document: +/// - Vector channels are allocated sequentially via gensym (indices 0, 1, 2, ...) +/// - Indices are wrapped in Unforgeable{} so clients can't forge them +/// - Unlike Array, Vector grows unbounded (until OOM) +#[derive(Debug)] +pub struct VectorChannelStore +where + C: Clone + Eq + Hash, + DC: Clone, + CC: Clone, +{ + /// Data collections indexed by position (internal usize index) + data: Vec>, + + /// Continuation collections keyed by channel pattern + continuations: HashMap, CC>, + + /// Join patterns: channel -> list of join patterns it participates in + joins: HashMap>>, + + /// Channel to internal index mapping (for data access) + channel_to_index: HashMap, + + /// Index to channel mapping (for export and iteration) + index_to_channel: Vec, + + /// Space ID for channel creation + space_id: SpaceId, + + /// Factory function to create channels from indices + channel_factory: fn(&SpaceId, usize) -> C, + + /// Extractor function to get index from a channel (reverse of channel_factory) + /// Returns Some(index) if the channel matches the expected pattern, None otherwise + index_extractor: fn(&SpaceId, &C) -> Option, + + /// Factory function to create new data collections + data_factory: fn() -> DC, + + /// Factory function to create new continuation collections + cont_factory: fn() -> CC, + + /// PhantomData to track P, A, K + _phantom: PhantomData<(P, A, K)>, +} + +impl Clone for VectorChannelStore +where + C: Clone + Eq + Hash, + DC: Clone, + CC: Clone, +{ + fn clone(&self) -> Self { + VectorChannelStore { + data: self.data.clone(), + continuations: self.continuations.clone(), + joins: self.joins.clone(), + channel_to_index: self.channel_to_index.clone(), + index_to_channel: self.index_to_channel.clone(), + space_id: self.space_id.clone(), + channel_factory: self.channel_factory, + index_extractor: self.index_extractor, + data_factory: self.data_factory, + cont_factory: self.cont_factory, + _phantom: PhantomData, + } + } +} + +impl VectorChannelStore +where + C: Clone + Eq + Hash + Send + Sync, + P: Clone + Send + Sync, + A: Clone + Send + Sync + std::fmt::Debug + 'static, + K: Clone + Send + Sync, + DC: Clone + Send + Sync, + CC: Clone + Send + Sync, +{ + /// Create a new vector channel store with the given channel factory. + /// + /// # Arguments + /// + /// * `space_id` - Space ID for channel creation (passed to channel_factory) + /// * `channel_factory` - Function to create a channel from (space_id, index) + /// * `index_extractor` - Function to extract index from a channel (reverse of channel_factory) + /// * `data_factory` - Function to create empty data collections + /// * `cont_factory` - Function to create empty continuation collections + pub fn new( + space_id: SpaceId, + channel_factory: fn(&SpaceId, usize) -> C, + index_extractor: fn(&SpaceId, &C) -> Option, + data_factory: fn() -> DC, + cont_factory: fn() -> CC, + ) -> Self { + VectorChannelStore { + data: Vec::new(), + continuations: HashMap::new(), + joins: HashMap::new(), + channel_to_index: HashMap::new(), + index_to_channel: Vec::new(), + space_id, + channel_factory, + index_extractor, + data_factory, + cont_factory, + _phantom: PhantomData, + } + } + + /// Get the internal index for a channel (immutable lookup only). + fn get_index(&self, channel: &C) -> Option { + self.channel_to_index.get(channel).copied() + } + + /// Try to extract index from a channel and register it if valid. + /// Returns the index if successful, None if the channel doesn't match this space. + fn try_register_channel(&mut self, channel: &C) -> Option { + // First check if already registered + if let Some(index) = self.channel_to_index.get(channel) { + return Some(*index); + } + + // Try to extract index from the channel using the extractor + let extracted = (self.index_extractor)(&self.space_id, channel); + + let index = match extracted { + Some(idx) => idx, + None => { + // Extractor returned None - channel doesn't match this space's pattern + return None; + } + }; + + // Grow data vector if needed + while self.data.len() <= index { + self.data.push(None); + } + + // Register the channel + self.channel_to_index.insert(channel.clone(), index); + while self.index_to_channel.len() <= index { + // Create a placeholder channel - will be overwritten + self.index_to_channel.push(channel.clone()); + } + self.index_to_channel[index] = channel.clone(); + + Some(index) + } + + /// Create a normalized key for continuation lookup. + /// Channels in a join pattern are sorted for consistent lookup. + fn normalize_channels(channels: &[C]) -> Vec + where + C: Ord, + { + let mut sorted = channels.to_vec(); + sorted.sort(); + sorted + } +} + +impl ChannelStore for VectorChannelStore +where + C: Clone + Eq + Hash + Ord + Send + Sync, + P: Clone + Send + Sync, + A: Clone + Send + Sync + std::fmt::Debug + 'static, + K: Clone + Send + Sync, + DC: DataCollection + Default + Clone + Send + Sync + 'static, + CC: ContinuationCollection + Default + Clone + Send + Sync, +{ + type Channel = C; + type Pattern = P; + type Data = A; + type Continuation = K; + type DataColl = DC; + type ContColl = CC; + + fn get_or_create_data_collection(&mut self, channel: &C) -> &mut DC { + // Try to register the channel if not already known (auto-registration for reducer-created channels) + let index = self.try_register_channel(channel) + .expect("Channel does not match this Vector space pattern"); + + if self.data[index].is_none() { + self.data[index] = Some((self.data_factory)()); + } + self.data[index].as_mut().expect("Data collection should exist after creation") + } + + fn get_data_collection(&self, channel: &C) -> Option<&DC> { + let index = self.get_index(channel)?; + self.data.get(index)?.as_ref() + } + + fn get_data_collection_mut(&mut self, channel: &C) -> Option<&mut DC> { + let index = self.get_index(channel)?; + self.data.get_mut(index)?.as_mut() + } + + fn get_or_create_continuation_collection(&mut self, channels: &[C]) -> &mut CC { + let key = Self::normalize_channels(channels); + self.continuations + .entry(key) + .or_insert_with(|| (self.cont_factory)()) + } + + fn get_continuation_collection(&self, channels: &[C]) -> Option<&CC> { + let key = Self::normalize_channels(channels); + self.continuations.get(&key) + } + + fn get_continuation_collection_mut(&mut self, channels: &[C]) -> Option<&mut CC> { + let key = Self::normalize_channels(channels); + self.continuations.get_mut(&key) + } + + fn all_channels(&self) -> Vec<&C> { + self.index_to_channel.iter().collect() + } + + fn gensym(&mut self, _space_id: &SpaceId) -> Result { + let index = self.data.len(); + + // Create new channel from index using the factory + let channel = (self.channel_factory)(&self.space_id, index); + + // Store bidirectional mapping + self.channel_to_index.insert(channel.clone(), index); + self.index_to_channel.push(channel.clone()); + + // Grow data vector + self.data.push(None); + + Ok(channel) + } + + fn get_joins(&self, channel: &C) -> Vec> { + self.joins.get(channel).cloned().unwrap_or_default() + } + + fn put_join(&mut self, channels: Vec) { + for channel in &channels { + self.joins + .entry(channel.clone()) + .or_insert_with(Vec::new) + .push(channels.clone()); + } + } + + fn remove_join(&mut self, channels: &[C]) { + for channel in channels { + if let Some(joins) = self.joins.get_mut(channel) { + joins.retain(|j| j != channels); + } + } + } + + fn snapshot(&self) -> Self { + self.clone() + } + + fn clear(&mut self) { + self.data.clear(); + self.continuations.clear(); + self.joins.clear(); + self.channel_to_index.clear(); + self.index_to_channel.clear(); + } + + fn is_empty(&self) -> bool { + self.data.iter().all(|d| d.is_none()) && self.continuations.is_empty() + } + + fn export_data(&self) -> Vec<(C, DC)> { + self.index_to_channel + .iter() + .enumerate() + .filter_map(|(i, channel)| { + self.data.get(i)?.as_ref().map(|dc| (channel.clone(), dc.clone())) + }) + .collect() + } + + fn export_continuations(&self) -> Vec<(Vec, CC)> { + self.continuations + .iter() + .map(|(cs, cc)| (cs.clone(), cc.clone())) + .collect() + } + + fn export_joins(&self) -> Vec<(C, Vec>)> { + self.joins + .iter() + .map(|(c, js)| (c.clone(), js.clone())) + .collect() + } + + fn for_each_data(&self, mut f: F) + where + F: FnMut(&C, &DC), + { + for (i, channel) in self.index_to_channel.iter().enumerate() { + if let Some(Some(dc)) = self.data.get(i) { + f(channel, dc); + } + } + } + + fn for_each_continuation(&self, mut f: F) + where + F: FnMut(&[C], &CC), + { + for (cs, cc) in &self.continuations { + f(cs, cc); + } + } + + fn for_each_join(&self, mut f: F) + where + F: FnMut(&C, &[Vec]), + { + for (c, js) in &self.joins { + f(c, js); + } + } + + fn gensym_counter(&self) -> usize { + self.data.len() + } + + fn import_data(&mut self, data: Vec<(C, DC)>) { + // Clear existing data and mappings + self.data.clear(); + self.channel_to_index.clear(); + self.index_to_channel.clear(); + + // Import new data + for (channel, dc) in data { + let index = self.data.len(); + self.data.push(Some(dc)); + self.channel_to_index.insert(channel.clone(), index); + self.index_to_channel.push(channel); + } + } + + fn import_continuations(&mut self, continuations: Vec<(Vec, CC)>) { + self.continuations.clear(); + for (channels, cc) in continuations { + self.continuations.insert(channels, cc); + } + } + + fn import_joins(&mut self, joins: Vec<(C, Vec>)>) { + self.joins.clear(); + for (channel, join_patterns) in joins { + self.joins.insert(channel, join_patterns); + } + } + + fn set_gensym_counter(&mut self, counter: usize) { + // For Vector, we resize the data vector to match the counter + while self.data.len() < counter { + self.data.push(None); + } + } + + fn allocation_mode(&self) -> AllocationMode { + AllocationMode::VectorIndex + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::rust::interpreter::spaces::collections::{BagDataCollection, BagContinuationCollection}; + + /// Factory function for creating usize channels from index + fn usize_channel_factory(_space_id: &SpaceId, index: usize) -> usize { + index + } + + /// Extractor function for getting index from usize channel + fn usize_index_extractor(_space_id: &SpaceId, channel: &usize) -> Option { + Some(*channel) + } + + #[test] + fn test_vector_store_growth() { + let mut store: VectorChannelStore, BagContinuationCollection> = + VectorChannelStore::new( + SpaceId::default_space(), + usize_channel_factory, + usize_index_extractor, + BagDataCollection::new, + BagContinuationCollection::new, + ); + + let space_id = SpaceId::default_space(); + for i in 0..100 { + let index = store.gensym(&space_id).unwrap(); + assert_eq!(index, i); + } + assert_eq!(store.data.len(), 100); + } + + #[test] + fn test_vector_allocation_mode_returns_vector_index() { + let store: VectorChannelStore, BagContinuationCollection> = + VectorChannelStore::new( + SpaceId::default_space(), + usize_channel_factory, + usize_index_extractor, + BagDataCollection::new, + BagContinuationCollection::new, + ); + + assert_eq!(store.allocation_mode(), AllocationMode::VectorIndex); + } +} diff --git a/rholang/src/rust/interpreter/spaces/channel_store/vectordb_store.rs b/rholang/src/rust/interpreter/spaces/channel_store/vectordb_store.rs new file mode 100644 index 000000000..915f90456 --- /dev/null +++ b/rholang/src/rust/interpreter/spaces/channel_store/vectordb_store.rs @@ -0,0 +1,414 @@ +//! VectorDBChannelStore: Vector database-backed channel storage with similarity search. + +use std::collections::HashMap; +use std::hash::Hash; +use std::sync::atomic::{AtomicUsize, Ordering}; + +use super::{ChannelStore, DataCollection, ContinuationCollection, SpaceId, SpaceError}; +use super::super::collections::{EmbeddingType, SimilarityMetric, VectorDBDataCollection, BagContinuationCollection}; + +/// VectorDB-specialized channel store that creates properly configured +/// VectorDBDataCollections. +/// +/// This store holds VectorDB configuration (dimensions, threshold, backend_name, +/// metric, embedding_type) and creates VectorDBDataCollections with those settings +/// when new channels are accessed. +/// +/// # Type Parameters +/// - `C`: Channel type +/// - `A`: Data type stored in VectorDB collections +/// - `P`: Pattern type for continuations +/// - `K`: Continuation type +#[derive(Debug)] +pub struct VectorDBChannelStore +where + C: Clone + Eq + Hash, + A: Clone + Send + Sync, + P: Clone + Send + Sync, + K: Clone + Send + Sync, +{ + /// Data collections indexed by channel + data: HashMap>, + + /// Continuation collections indexed by channel pattern (sorted channel vec) + continuations: HashMap, BagContinuationCollection>, + + /// Join patterns: channel -> list of join patterns it participates in + joins: HashMap>>, + + /// Counter for generating unique channel names + gensym_counter: AtomicUsize, + + /// VectorDB configuration: number of embedding dimensions + dimensions: usize, + + /// VectorDB configuration: similarity threshold (0.0 to 1.0) + threshold: f32, + + /// VectorDB configuration: backend name (e.g., "rho", "default", "pinecone") + backend_name: String, + + /// VectorDB configuration: similarity metric (None = use backend default) + metric: Option, + + /// VectorDB configuration: expected embedding format from Rholang + embedding_type: EmbeddingType, +} + +impl Clone for VectorDBChannelStore +where + C: Clone + Eq + Hash, + A: Clone + Send + Sync, + P: Clone + Send + Sync, + K: Clone + Send + Sync, +{ + fn clone(&self) -> Self { + VectorDBChannelStore { + data: self.data.clone(), + continuations: self.continuations.clone(), + joins: self.joins.clone(), + gensym_counter: AtomicUsize::new(self.gensym_counter.load(Ordering::SeqCst)), + dimensions: self.dimensions, + threshold: self.threshold, + backend_name: self.backend_name.clone(), + metric: self.metric, + embedding_type: self.embedding_type, + } + } +} + +impl VectorDBChannelStore +where + C: Clone + Eq + Hash + Send + Sync, + A: Clone + Send + Sync, + P: Clone + Send + Sync, + K: Clone + Send + Sync, +{ + /// Create a new VectorDB channel store with the given configuration. + /// + /// # Arguments + /// - `dimensions`: The dimensionality of embedding vectors + /// - `threshold`: Default similarity threshold (0.0 to 1.0) + /// - `metric`: Similarity metric to use (None = use backend default based on embedding_type) + /// - `backend_name`: Backend to use (e.g., "rho", "default", "pinecone") + /// - `embedding_type`: Expected embedding format from Rholang + pub fn new( + dimensions: usize, + threshold: f32, + metric: Option, + backend_name: impl Into, + embedding_type: EmbeddingType, + ) -> Self { + VectorDBChannelStore { + data: HashMap::new(), + continuations: HashMap::new(), + joins: HashMap::new(), + gensym_counter: AtomicUsize::new(0), + dimensions, + threshold, + backend_name: backend_name.into(), + metric, + embedding_type, + } + } + + /// Create with default metric and embedding type. + pub fn with_dimensions(dimensions: usize) -> Self { + Self::new( + dimensions, + 0.8, + None, // Let backend decide based on embedding_type + "rho", + EmbeddingType::Integer, // Default to integer for Rholang (0-100 scale) + ) + } + + /// Create with dimensions and threshold. + pub fn with_threshold(dimensions: usize, threshold: f32) -> Self { + Self::new( + dimensions, + threshold, + None, // Let backend decide based on embedding_type + "rho", + EmbeddingType::Integer, + ) + } + + /// Create a normalized key for continuation lookup. + fn normalize_channels(channels: &[C]) -> Vec + where + C: Ord, + { + let mut sorted = channels.to_vec(); + sorted.sort(); + sorted + } + + /// Get the configured dimensions. + pub fn dimensions(&self) -> usize { + self.dimensions + } + + /// Get the configured threshold. + pub fn threshold(&self) -> f32 { + self.threshold + } + + /// Get the configured metric (None means backend decides based on embedding_type). + pub fn metric(&self) -> Option { + self.metric + } + + /// Get the configured backend name. + pub fn backend_name(&self) -> &str { + &self.backend_name + } + + /// Get the configured embedding type. + pub fn embedding_type(&self) -> EmbeddingType { + self.embedding_type + } +} + +impl ChannelStore for VectorDBChannelStore +where + C: Clone + Eq + Hash + Ord + Send + Sync + From + 'static, + A: Clone + Send + Sync + std::fmt::Debug + 'static, + P: Clone + PartialEq + Send + Sync + 'static, + K: Clone + Send + Sync + 'static, +{ + type Channel = C; + type Pattern = P; + type Data = A; + type Continuation = K; + type DataColl = VectorDBDataCollection; + type ContColl = BagContinuationCollection; + + fn get_or_create_data_collection(&mut self, channel: &C) -> &mut VectorDBDataCollection { + let dimensions = self.dimensions; + let threshold = self.threshold; + let metric = self.metric; + let backend_name = self.backend_name.clone(); + let embedding_type = self.embedding_type; + + self.data.entry(channel.clone()).or_insert_with(|| { + VectorDBDataCollection::with_config(&backend_name, dimensions, threshold, metric, embedding_type) + }) + } + + fn get_data_collection(&self, channel: &C) -> Option<&VectorDBDataCollection> { + self.data.get(channel) + } + + fn get_data_collection_mut(&mut self, channel: &C) -> Option<&mut VectorDBDataCollection> { + self.data.get_mut(channel) + } + + fn get_or_create_continuation_collection( + &mut self, + channels: &[C], + ) -> &mut BagContinuationCollection { + let key = Self::normalize_channels(channels); + self.continuations + .entry(key) + .or_insert_with(BagContinuationCollection::new) + } + + fn get_continuation_collection( + &self, + channels: &[C], + ) -> Option<&BagContinuationCollection> { + let key = Self::normalize_channels(channels); + self.continuations.get(&key) + } + + fn get_continuation_collection_mut( + &mut self, + channels: &[C], + ) -> Option<&mut BagContinuationCollection> { + let key = Self::normalize_channels(channels); + self.continuations.get_mut(&key) + } + + fn all_channels(&self) -> Vec<&C> { + self.data.keys().collect() + } + + fn gensym(&mut self, _space_id: &SpaceId) -> Result { + let counter = self.gensym_counter.fetch_add(1, Ordering::SeqCst); + Ok(C::from(counter)) + } + + fn get_joins(&self, channel: &C) -> Vec> { + self.joins.get(channel).cloned().unwrap_or_default() + } + + fn put_join(&mut self, channels: Vec) { + for channel in &channels { + self.joins + .entry(channel.clone()) + .or_default() + .push(channels.clone()); + } + } + + fn remove_join(&mut self, channels: &[C]) { + for channel in channels { + if let Some(patterns) = self.joins.get_mut(channel) { + patterns.retain(|p| p != channels); + } + } + } + + fn snapshot(&self) -> Self { + self.clone() + } + + fn clear(&mut self) { + self.data.clear(); + self.continuations.clear(); + self.joins.clear(); + } + + fn is_empty(&self) -> bool { + self.data.is_empty() && self.continuations.is_empty() + } + + fn export_data(&self) -> Vec<(C, VectorDBDataCollection)> { + self.data + .iter() + .map(|(k, v)| (k.clone(), v.clone())) + .collect() + } + + fn export_continuations(&self) -> Vec<(Vec, BagContinuationCollection)> { + self.continuations + .iter() + .map(|(k, v)| (k.clone(), v.clone())) + .collect() + } + + fn export_joins(&self) -> Vec<(C, Vec>)> { + self.joins + .iter() + .map(|(k, v)| (k.clone(), v.clone())) + .collect() + } + + fn for_each_data(&self, mut f: F) + where + F: FnMut(&C, &VectorDBDataCollection), + { + for (c, dc) in &self.data { + f(c, dc); + } + } + + fn for_each_continuation(&self, mut f: F) + where + F: FnMut(&[C], &BagContinuationCollection), + { + for (cs, cc) in &self.continuations { + f(cs, cc); + } + } + + fn for_each_join(&self, mut f: F) + where + F: FnMut(&C, &[Vec]), + { + for (c, js) in &self.joins { + f(c, js); + } + } + + fn gensym_counter(&self) -> usize { + self.gensym_counter.load(Ordering::SeqCst) + } + + fn import_data(&mut self, data: Vec<(C, VectorDBDataCollection)>) { + self.data.clear(); + for (channel, collection) in data { + self.data.insert(channel, collection); + } + } + + fn import_continuations(&mut self, continuations: Vec<(Vec, BagContinuationCollection)>) { + self.continuations.clear(); + for (channels, collection) in continuations { + self.continuations.insert(channels, collection); + } + } + + fn import_joins(&mut self, joins: Vec<(C, Vec>)>) { + self.joins.clear(); + for (channel, join_patterns) in joins { + self.joins.insert(channel, join_patterns); + } + } + + fn set_gensym_counter(&mut self, counter: usize) { + self.gensym_counter.store(counter, Ordering::SeqCst); + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_vectordb_store_basic() { + let mut store: VectorDBChannelStore = + VectorDBChannelStore::with_dimensions(128); + + let channel = 42usize; + let dc = store.get_or_create_data_collection(&channel); + + // VectorDB data collection starts empty + assert_eq!(dc.len(), 0); + } + + #[test] + fn test_vectordb_gensym() { + let mut store: VectorDBChannelStore = + VectorDBChannelStore::with_dimensions(128); + + let space_id = SpaceId::default_space(); + let c1 = store.gensym(&space_id).expect("gensym should succeed"); + let c2 = store.gensym(&space_id).expect("gensym should succeed"); + let c3 = store.gensym(&space_id).expect("gensym should succeed"); + + assert_eq!(c1, 0); + assert_eq!(c2, 1); + assert_eq!(c3, 2); + } + + #[test] + fn test_vectordb_configuration() { + let store: VectorDBChannelStore = + VectorDBChannelStore::new( + 256, + 0.9, + Some(SimilarityMetric::Euclidean), + "rho", + EmbeddingType::Float, + ); + + assert_eq!(store.dimensions(), 256); + assert!((store.threshold() - 0.9).abs() < f32::EPSILON); + assert_eq!(store.metric(), Some(SimilarityMetric::Euclidean)); + assert_eq!(store.embedding_type(), EmbeddingType::Float); + assert_eq!(store.backend_name(), "rho"); + } + + #[test] + fn test_vectordb_with_threshold() { + let store: VectorDBChannelStore = + VectorDBChannelStore::with_threshold(512, 0.75); + + assert_eq!(store.dimensions(), 512); + assert!((store.threshold() - 0.75).abs() < f32::EPSILON); + assert_eq!(store.metric(), None); // Backend decides based on embedding_type + assert_eq!(store.embedding_type(), EmbeddingType::Integer); + } +} diff --git a/rholang/src/rust/interpreter/spaces/mod.rs b/rholang/src/rust/interpreter/spaces/mod.rs index 5fb712092..2007834fe 100644 --- a/rholang/src/rust/interpreter/spaces/mod.rs +++ b/rholang/src/rust/interpreter/spaces/mod.rs @@ -1,8 +1,9 @@ -//! Spaces Module - Foundation Traits & Collections Structure +//! Spaces Module - Foundation Traits, Collections & Outer Storage //! -//! This module provides the foundation traits for reified RSpaces. +//! This module provides the foundation for reified RSpaces. pub mod collections; +pub mod channel_store; pub mod errors; pub mod matcher; pub mod types; @@ -16,5 +17,8 @@ pub use types::{ ContinuationBound, SpaceParamBound, SpaceId, + InnerCollectionType, + OuterStorageType, }; pub use errors::SpaceError; +pub use channel_store::ChannelStore; diff --git a/rholang/src/rust/interpreter/spaces/types/mod.rs b/rholang/src/rust/interpreter/spaces/types/mod.rs index 0e34c7891..85b03becb 100644 --- a/rholang/src/rust/interpreter/spaces/types/mod.rs +++ b/rholang/src/rust/interpreter/spaces/types/mod.rs @@ -3,7 +3,9 @@ pub mod bounds; pub mod id; pub mod collections; +pub mod pathmap; pub use bounds::*; pub use id::SpaceId; pub use collections::{InnerCollectionType, OuterStorageType}; +pub use pathmap::{PathMapKeyConversion, PathTrie}; diff --git a/rholang/src/rust/interpreter/spaces/types/pathmap.rs b/rholang/src/rust/interpreter/spaces/types/pathmap.rs new file mode 100644 index 000000000..1d1e8397a --- /dev/null +++ b/rholang/src/rust/interpreter/spaces/types/pathmap.rs @@ -0,0 +1,760 @@ +//! PathMap Prefix Aggregation and Path Encoding +//! +//! This module provides the path encoding scheme for PathMap prefix semantics +//! with Rholang channels. It supports converting between Rholang Par expressions +//! and byte paths for hierarchical channel indexing. + +use std::fmt; + +use models::rhoapi::{expr::ExprInstance, EList, Expr, Par}; + +// ========================================================================== +// PathMap Prefix Aggregation Types +// ========================================================================== + +/// Suffix path stripped during prefix aggregation. +/// +/// When data at path `@[0, 1, 2]` is viewed from prefix `@[0, 1]`, the suffix +/// is `[2]`. This suffix becomes part of the aggregated data as a key. +/// +/// # Spec Reference +/// From "Reifying RSpaces" lines 159-192: +/// ```text +/// @[0, 1, 2]!({|"hi"|}) | @[0, 1, 2]!({|"hello"|}) | @[0, 1, 3]!({|"there"|}) +/// = @[0, 1]!({|[2, "hi"], [2, "hello"], [3, "there"]|}) +/// ``` +/// +/// The suffix `[2]` is prepended to the data when viewed at `@[0, 1]`. +pub type SuffixKey = Vec; + +/// Aggregated data item with its suffix key for PathMap prefix semantics. +/// +/// When consuming at a prefix path, data from descendant paths is aggregated +/// with their relative suffix keys attached. This allows pattern matching +/// to distinguish data from different child paths. +/// +/// # Example +/// ```ignore +/// // Data at @[0, 1, 2] viewed at @[0, 1]: +/// AggregatedDatum { +/// suffix_key: vec![2], // The suffix [2] +/// data: "hi", +/// persist: false, +/// } +/// ``` +/// +/// # Formal Correspondence +/// - `PathMapStore.v`: `send_visible_from_prefix` theorem +/// - `PathMapQuantale.v`: Path concatenation properties +#[derive(Clone, Debug, PartialEq)] +pub struct AggregatedDatum { + /// The suffix path relative to the consuming prefix. + /// Empty for exact-path matches (data at the same path as the consumer). + pub suffix_key: SuffixKey, + + /// The actual data being aggregated. + pub data: A, + + /// Whether this data persists after consumption. + pub persist: bool, +} + +impl AggregatedDatum { + /// Create a new aggregated datum with the given suffix key. + pub fn new(suffix_key: SuffixKey, data: A, persist: bool) -> Self { + AggregatedDatum { + suffix_key, + data, + persist, + } + } + + /// Create an aggregated datum with empty suffix (exact path match). + pub fn exact(data: A, persist: bool) -> Self { + AggregatedDatum { + suffix_key: Vec::new(), + data, + persist, + } + } + + /// Check if this datum is from an exact path match (no suffix). + pub fn is_exact_match(&self) -> bool { + self.suffix_key.is_empty() + } + + /// Get the suffix key length (depth of path difference). + pub fn suffix_depth(&self) -> usize { + self.suffix_key.len() + } + + /// Map the data through a function, preserving suffix and persist. + pub fn map B>(self, f: F) -> AggregatedDatum { + AggregatedDatum { + suffix_key: self.suffix_key, + data: f(self.data), + persist: self.persist, + } + } +} + +impl AggregatedDatum { + /// Get the data, cloning it. + pub fn data_cloned(&self) -> A { + self.data.clone() + } +} + +impl fmt::Display for AggregatedDatum { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + if self.suffix_key.is_empty() { + write!(f, "{}", self.data) + } else { + write!(f, "[{:?}, {}]", self.suffix_key, self.data) + } + } +} + +// ========================================================================== +// Path Prefix Utilities +// ========================================================================== + +/// Compute the suffix of a child path relative to a prefix path. +/// +/// Returns `None` if `prefix` is not a prefix of `child`. +/// +/// # Example +/// ```ignore +/// assert_eq!(get_path_suffix(&[0, 1], &[0, 1, 2, 3]), Some(vec![2, 3])); +/// assert_eq!(get_path_suffix(&[0, 1], &[0, 2]), None); +/// assert_eq!(get_path_suffix(&[0, 1], &[0, 1]), Some(vec![])); +/// ``` +pub fn get_path_suffix(prefix: &[u8], child: &[u8]) -> Option { + if child.len() >= prefix.len() && child.starts_with(prefix) { + Some(child[prefix.len()..].to_vec()) + } else { + None + } +} + +/// Generate all prefixes of a path, from shortest to longest. +/// +/// # Example +/// ```ignore +/// assert_eq!( +/// path_prefixes(&[0, 1, 2]), +/// vec![vec![0], vec![0, 1], vec![0, 1, 2]] +/// ); +/// ``` +pub fn path_prefixes(path: &[u8]) -> Vec> { + (1..=path.len()) + .map(|len| path[..len].to_vec()) + .collect() +} + +/// Check if `prefix` is a prefix of `path`. +pub fn is_path_prefix(prefix: &[u8], path: &[u8]) -> bool { + path.len() >= prefix.len() && path.starts_with(prefix) +} + +/// Get byte offsets at element boundaries in a tagged path. +/// +/// For tagged paths, each element has variable length: +/// - INTEGER (0x01): 9 bytes (tag + 8-byte i64) +/// - STRING (0x02): 1 + varint_len + data_len bytes +/// - BYTE_ARRAY (0x03): 1 + varint_len + data_len bytes +/// +/// Returns byte offsets at element boundaries (after each complete element). +/// Only complete elements are included; partial elements at the end are ignored. +/// +/// # Example +/// ```ignore +/// // Integer path [0, 1, 2] = 27 bytes (3 integers × 9 bytes each) +/// let path = par_to_path(&create_int_elist_par(vec![0, 1, 2])).unwrap(); +/// let boundaries = path_element_boundaries(&path); +/// assert_eq!(boundaries, vec![9, 18, 27]); +/// +/// // Generates prefixes at element boundaries: +/// // - path[..9] = @[0] +/// // - path[..18] = @[0, 1] +/// // - path[..27] = @[0, 1, 2] +/// ``` +pub fn path_element_boundaries(path: &[u8]) -> Vec { + let mut boundaries = Vec::new(); + let mut offset = 0; + + while offset < path.len() { + if let Some(element_size) = get_element_size(&path[offset..]) { + offset += element_size; + boundaries.push(offset); + } else { + break; // Invalid or incomplete element, stop parsing + } + } + + boundaries +} + +/// Get the size of a path element without fully decoding it. +/// +/// Returns `Some(size)` if a valid element starts at the given bytes, +/// `None` if the element is invalid or incomplete. +fn get_element_size(bytes: &[u8]) -> Option { + if bytes.is_empty() { + return None; + } + + let tag = bytes[0]; + let rest = &bytes[1..]; + + match tag { + path_tags::INTEGER => { + // INTEGER: tag (1) + i64 LE (8) = 9 bytes + if rest.len() >= 8 { + Some(9) + } else { + None // Incomplete integer + } + } + path_tags::STRING | path_tags::BYTE_ARRAY => { + // STRING or BYTE_ARRAY: tag (1) + varint length + data + let (len, varint_size) = decode_varint_size(rest)?; + let total = 1 + varint_size + len; + if bytes.len() >= total { + Some(total) + } else { + None // Incomplete string/byte array + } + } + _ => None, // Unknown tag + } +} + +/// Decode a varint and return just the value and bytes consumed. +/// Helper for `get_element_size` to avoid duplicating varint logic. +fn decode_varint_size(bytes: &[u8]) -> Option<(usize, usize)> { + let mut result: u64 = 0; + let mut shift: u32 = 0; + let mut consumed = 0; + + for &byte in bytes { + consumed += 1; + result |= ((byte & 0x7F) as u64) << shift; + if byte & 0x80 == 0 { + return Some((result as usize, consumed)); + } + shift += 7; + if shift > 63 { + return None; // Overflow protection + } + } + None // Incomplete varint +} + +// ========================================================================== +// Par-to-Path Conversion (for PathMap prefix semantics with Rholang) +// ========================================================================== +// +// Path Encoding Scheme (Varint + MORK Style) +// ------------------------------------------ +// Each path element is encoded with a type tag followed by data: +// +// Tag bytes: +// 0x01 = Integer (i64, 8 bytes little-endian) +// 0x02 = String (varint length + UTF-8 bytes) +// 0x03 = ByteArray (varint length + raw bytes) +// +// Varint encoding: 7 bits per byte, high bit (0x80) = continuation flag +// +// Examples: +// Integer 42 → [0x01, 42, 0, 0, 0, 0, 0, 0, 0] (tag + i64 LE) +// String "auth" → [0x02, 4, 'a', 'u', 't', 'h'] (tag + varint_len + utf8) +// String (300 chars) → [0x02, 0xAC, 0x02, ...] (varint 300 = 0xAC 0x02) +// +// Prefix semantics are preserved because string lengths are encoded: +// @["auth"] is NOT a prefix of @["author"] since their lengths differ. +// +// Backward compatibility: Legacy paths (single-byte integers 0-255) are detected +// by checking if the first byte is NOT a valid tag (0x01, 0x02, 0x03). +// ========================================================================== + +/// Tag bytes for path element encoding. +pub mod path_tags { + /// Integer (i64, 8 bytes little-endian) + pub const INTEGER: u8 = 0x01; + /// String (varint length + UTF-8 bytes) + pub const STRING: u8 = 0x02; + /// ByteArray (varint length + raw bytes) + pub const BYTE_ARRAY: u8 = 0x03; +} + +/// Encode a varint (variable-length integer) into a buffer. +/// +/// Uses 7 bits per byte with the high bit (0x80) as a continuation flag. +/// This matches the encoding used by MeTTaTron for compatibility. +/// +/// # Example +/// ```ignore +/// let mut buf = Vec::new(); +/// encode_varint(&mut buf, 127); // [127] +/// encode_varint(&mut buf, 128); // [0x80, 0x01] +/// encode_varint(&mut buf, 300); // [0xAC, 0x02] +/// ``` +pub fn encode_varint(buf: &mut Vec, mut n: u64) { + while n >= 0x80 { + buf.push((n as u8) | 0x80); + n >>= 7; + } + buf.push(n as u8); +} + +/// Decode a varint from a byte slice. +/// +/// Returns `Some((value, bytes_consumed))` on success, `None` on error. +/// +/// # Example +/// ```ignore +/// assert_eq!(decode_varint(&[127]), Some((127, 1))); +/// assert_eq!(decode_varint(&[0x80, 0x01]), Some((128, 2))); +/// assert_eq!(decode_varint(&[0xAC, 0x02]), Some((300, 2))); +/// ``` +pub fn decode_varint(bytes: &[u8]) -> Option<(u64, usize)> { + let mut result: u64 = 0; + let mut shift: u32 = 0; + let mut consumed = 0; + + for &byte in bytes { + consumed += 1; + result |= ((byte & 0x7F) as u64) << shift; + if byte & 0x80 == 0 { + return Some((result, consumed)); + } + shift += 7; + if shift > 63 { + return None; // Overflow protection + } + } + None // Incomplete varint +} + +/// Encode a single path element (Par) into the buffer. +/// +/// Supports GInt, GString, and GByteArray types. +/// Returns `true` if encoding succeeded, `false` if the element type is unsupported. +fn encode_path_element(buf: &mut Vec, par: &Par) -> bool { + for expr in &par.exprs { + match &expr.expr_instance { + Some(ExprInstance::GInt(n)) => { + buf.push(path_tags::INTEGER); + buf.extend_from_slice(&n.to_le_bytes()); + return true; + } + Some(ExprInstance::GString(s)) => { + buf.push(path_tags::STRING); + encode_varint(buf, s.len() as u64); + buf.extend_from_slice(s.as_bytes()); + return true; + } + Some(ExprInstance::GByteArray(bytes)) => { + buf.push(path_tags::BYTE_ARRAY); + encode_varint(buf, bytes.len() as u64); + buf.extend_from_slice(bytes); + return true; + } + _ => continue, + } + } + false +} + +/// Decode a single path element from bytes. +/// +/// Returns `Some((par, bytes_consumed))` on success, `None` on error. +fn decode_path_element(bytes: &[u8]) -> Option<(Par, usize)> { + if bytes.is_empty() { + return None; + } + + let tag = bytes[0]; + let rest = &bytes[1..]; + + match tag { + path_tags::INTEGER => { + if rest.len() < 8 { + return None; + } + let n = i64::from_le_bytes(rest[..8].try_into().ok()?); + let par = Par::default().with_exprs(vec![Expr { + expr_instance: Some(ExprInstance::GInt(n)), + }]); + Some((par, 9)) // tag + 8 bytes + } + path_tags::STRING => { + let (len, varint_size) = decode_varint(rest)?; + let len = len as usize; + let data_start = varint_size; + if rest.len() < data_start + len { + return None; + } + let s = std::str::from_utf8(&rest[data_start..data_start + len]).ok()?; + let par = Par::default().with_exprs(vec![Expr { + expr_instance: Some(ExprInstance::GString(s.to_string())), + }]); + Some((par, 1 + data_start + len)) // tag + varint + data + } + path_tags::BYTE_ARRAY => { + let (len, varint_size) = decode_varint(rest)?; + let len = len as usize; + let data_start = varint_size; + if rest.len() < data_start + len { + return None; + } + let bytes_data = rest[data_start..data_start + len].to_vec(); + let par = Par::default().with_exprs(vec![Expr { + expr_instance: Some(ExprInstance::GByteArray(bytes_data)), + }]); + Some((par, 1 + data_start + len)) // tag + varint + data + } + _ => None, // Unknown tag + } +} + +/// Check if a byte path uses the new tagged encoding format. +/// +/// New format starts with a valid tag (0x01, 0x02, 0x03). +/// Legacy format uses raw bytes (integers 0-255). +fn is_tagged_path(bytes: &[u8]) -> bool { + if bytes.is_empty() { + return false; + } + matches!(bytes[0], path_tags::INTEGER | path_tags::STRING | path_tags::BYTE_ARRAY) +} + +/// Convert a Rholang Par representing a path to a byte path. +/// +/// For PathMap prefix semantics to work with Rholang channels, we need to convert +/// Par channels like `@[0, 1, 2]` or `@["sys", "auth"]` to byte paths. +/// +/// # Supported Path Formats +/// - `@[0, 1, 2]` → Integer path (tagged encoding) +/// - `@["sys", "auth"]` → String path (tagged encoding with length prefixes) +/// - `@[0, "auth", 2]` → Mixed path (tagged encoding) +/// +/// # Invalid Formats (returns None) +/// - `@"string"` - Not a list +/// - Any EList element that isn't GInt, GString, or GByteArray +/// +/// # Prefix Semantics +/// String paths preserve prefix semantics because lengths are encoded: +/// - `@["auth"]` encodes as `[0x02, 4, 'a', 'u', 't', 'h']` +/// - `@["author"]` encodes as `[0x02, 6, 'a', 'u', 't', 'h', 'o', 'r']` +/// These are NOT prefix-related because the length bytes differ. +/// +/// # Example +/// ```ignore +/// let par = create_string_elist_par(vec!["sys", "auth"]); +/// let path = par_to_path(&par); +/// assert!(path.is_some()); +/// ``` +pub fn par_to_path(par: &Par) -> Option> { + // Look for an EList expression in the Par + for expr in &par.exprs { + if let Some(ExprInstance::EListBody(elist)) = &expr.expr_instance { + return elist_to_path(elist); + } + } + + // Also check if it's a single element (path of length 1) + // Try to encode it as a single-element path + let mut buf = Vec::new(); + if encode_path_element(&mut buf, par) { + return Some(buf); + } + + None +} + +/// Convert an EList to a byte path with tagged encoding. +fn elist_to_path(elist: &EList) -> Option> { + // Pre-allocate with estimate: 9 bytes per integer, variable for strings + let mut path = Vec::with_capacity(elist.ps.len() * 9); + + for p in &elist.ps { + if !encode_path_element(&mut path, p) { + return None; // Unsupported element type + } + } + + Some(path) +} + +/// Convert a byte path to a Rholang Par (EList). +/// +/// This is the inverse of `par_to_path`. It decodes tagged elements and +/// creates a Par with an EList containing the appropriate expression types. +/// +/// # Backward Compatibility +/// Detects legacy format (raw bytes) vs new tagged format: +/// - If first byte is a valid tag (0x01, 0x02, 0x03), decode as tagged +/// - Otherwise, treat as legacy integer-only format (each byte = one GInt 0-255) +/// +/// # Example +/// ```ignore +/// // New format +/// let path = vec![0x02, 4, b'a', b'u', b't', b'h']; // String "auth" +/// let par = path_to_par(&path); +/// // par is equivalent to @["auth"] +/// +/// // Legacy format (backward compatible) +/// let legacy = vec![0, 1, 2]; +/// let par_legacy = path_to_par(&legacy); +/// // par_legacy is equivalent to @[0, 1, 2] +/// ``` +pub fn path_to_par(path: &[u8]) -> Par { + if path.is_empty() { + return Par::default().with_exprs(vec![Expr { + expr_instance: Some(ExprInstance::EListBody(EList { + ps: vec![], + locally_free: vec![], + connective_used: false, + remainder: None, + })), + }]); + } + + // Check if this is the new tagged format + if is_tagged_path(path) { + if let Some(elements) = decode_tagged_path(path) { + return Par::default().with_exprs(vec![Expr { + expr_instance: Some(ExprInstance::EListBody(EList { + ps: elements, + locally_free: vec![], + connective_used: false, + remainder: None, + })), + }]); + } + } + + // Legacy format: each byte is a GInt 0-255 + let elements: Vec = path + .iter() + .map(|&byte| { + Par::default().with_exprs(vec![Expr { + expr_instance: Some(ExprInstance::GInt(byte as i64)), + }]) + }) + .collect(); + + Par::default().with_exprs(vec![Expr { + expr_instance: Some(ExprInstance::EListBody(EList { + ps: elements, + locally_free: vec![], + connective_used: false, + remainder: None, + })), + }]) +} + +/// Decode a tagged path into a list of Par elements. +fn decode_tagged_path(path: &[u8]) -> Option> { + let mut elements = Vec::new(); + let mut offset = 0; + + while offset < path.len() { + let (par, consumed) = decode_path_element(&path[offset..])?; + elements.push(par); + offset += consumed; + } + + Some(elements) +} + +/// Check if a Par represents a valid path (EList of integers 0-255). +/// +/// This is a convenience function for checking if prefix semantics can apply. +pub fn is_par_path(par: &Par) -> bool { + par_to_path(par).is_some() +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_get_path_suffix_valid() { + assert_eq!(get_path_suffix(&[0, 1], &[0, 1, 2]), Some(vec![2])); + assert_eq!(get_path_suffix(&[0, 1], &[0, 1, 2, 3]), Some(vec![2, 3])); + assert_eq!(get_path_suffix(&[0, 1], &[0, 1]), Some(vec![])); + assert_eq!(get_path_suffix(&[], &[0, 1, 2]), Some(vec![0, 1, 2])); + } + + #[test] + fn test_get_path_suffix_invalid() { + assert_eq!(get_path_suffix(&[0, 1], &[0, 2]), None); + assert_eq!(get_path_suffix(&[0, 1, 2], &[0, 1]), None); + assert_eq!(get_path_suffix(&[1, 2, 3], &[4, 5, 6]), None); + } + + #[test] + fn test_path_prefixes() { + let prefixes = path_prefixes(&[0, 1, 2]); + assert_eq!(prefixes, vec![vec![0], vec![0, 1], vec![0, 1, 2]]); + + let prefixes = path_prefixes(&[5]); + assert_eq!(prefixes, vec![vec![5]]); + + let prefixes = path_prefixes(&[]); + assert!(prefixes.is_empty()); + } + + #[test] + fn test_is_path_prefix() { + assert!(is_path_prefix(&[0, 1], &[0, 1, 2])); + assert!(is_path_prefix(&[0, 1], &[0, 1])); + assert!(is_path_prefix(&[], &[0, 1, 2])); + + assert!(!is_path_prefix(&[0, 1], &[0, 2])); + assert!(!is_path_prefix(&[0, 1, 2], &[0, 1])); + } + + #[test] + fn test_aggregated_datum_new() { + let datum: AggregatedDatum = AggregatedDatum::new( + vec![2], + "hello".to_string(), + false, + ); + + assert_eq!(datum.suffix_key, vec![2]); + assert_eq!(datum.data, "hello"); + assert!(!datum.persist); + assert!(!datum.is_exact_match()); + assert_eq!(datum.suffix_depth(), 1); + } + + #[test] + fn test_aggregated_datum_exact() { + let datum: AggregatedDatum = AggregatedDatum::exact(42, true); + + assert!(datum.suffix_key.is_empty()); + assert_eq!(datum.data, 42); + assert!(datum.persist); + assert!(datum.is_exact_match()); + assert_eq!(datum.suffix_depth(), 0); + } + + #[test] + fn test_aggregated_datum_map() { + let datum: AggregatedDatum = AggregatedDatum::new(vec![1, 2], 10, true); + let mapped = datum.map(|x| x * 2); + + assert_eq!(mapped.suffix_key, vec![1, 2]); + assert_eq!(mapped.data, 20); + assert!(mapped.persist); + } + + #[test] + fn test_varint_encode_small() { + let mut buf = Vec::new(); + encode_varint(&mut buf, 0); + assert_eq!(buf, vec![0]); + + let mut buf = Vec::new(); + encode_varint(&mut buf, 127); + assert_eq!(buf, vec![127]); + } + + #[test] + fn test_varint_encode_large() { + let mut buf = Vec::new(); + encode_varint(&mut buf, 128); + assert_eq!(buf, vec![0x80, 0x01]); + + let mut buf = Vec::new(); + encode_varint(&mut buf, 300); + assert_eq!(buf, vec![0xAC, 0x02]); + } + + #[test] + fn test_varint_roundtrip() { + for n in [0, 1, 127, 128, 255, 256, 300, 1000, 16383, 16384, 100000] { + let mut buf = Vec::new(); + encode_varint(&mut buf, n); + let (decoded, _) = decode_varint(&buf).expect("varint decode failed"); + assert_eq!(decoded, n, "Roundtrip failed for {}", n); + } + } + + #[test] + fn test_par_to_path_integer_list() { + let elements: Vec = vec![0i64, 1, 2] + .into_iter() + .map(|n| Par::default().with_exprs(vec![Expr { + expr_instance: Some(ExprInstance::GInt(n)), + }])) + .collect(); + + let par = Par::default().with_exprs(vec![Expr { + expr_instance: Some(ExprInstance::EListBody(EList { + ps: elements, + locally_free: vec![], + connective_used: false, + remainder: None, + })), + }]); + + let path = par_to_path(&par).expect("par_to_path failed"); + + assert_eq!(path.len(), 27); + assert_eq!(path[0], path_tags::INTEGER); + assert_eq!(path[9], path_tags::INTEGER); + assert_eq!(path[18], path_tags::INTEGER); + } + + #[test] + fn test_par_to_path_string_list() { + let elements: Vec = vec!["sys", "auth"] + .into_iter() + .map(|s| Par::default().with_exprs(vec![Expr { + expr_instance: Some(ExprInstance::GString(s.to_string())), + }])) + .collect(); + + let par = Par::default().with_exprs(vec![Expr { + expr_instance: Some(ExprInstance::EListBody(EList { + ps: elements, + locally_free: vec![], + connective_used: false, + remainder: None, + })), + }]); + + let path = par_to_path(&par).expect("par_to_path failed"); + + assert_eq!(path.len(), 11); + assert_eq!(path[0], path_tags::STRING); + assert_eq!(path[1], 3); // length of "sys" + assert_eq!(path[5], path_tags::STRING); + assert_eq!(path[6], 4); // length of "auth" + } + + #[test] + fn test_path_to_par_legacy_format() { + let legacy_path = vec![0u8, 4, 5]; + let par = path_to_par(&legacy_path); + + if let Some(ExprInstance::EListBody(elist)) = &par.exprs[0].expr_instance { + assert_eq!(elist.ps.len(), 3); + for (i, expected) in [0i64, 4, 5].iter().enumerate() { + if let Some(ExprInstance::GInt(n)) = &elist.ps[i].exprs[0].expr_instance { + assert_eq!(*n, *expected); + } else { + panic!("Expected GInt at position {}", i); + } + } + } else { + panic!("Expected EListBody"); + } + } +}