|
| 1 | +use crate::execution::dql::join::hash::{filter, FilterArgs, JoinProbeState, ProbeArgs}; |
| 2 | +use crate::execution::dql::join::hash_join::BuildState; |
| 3 | +use crate::execution::dql::sort::BumpVec; |
| 4 | +use crate::execution::Executor; |
| 5 | +use crate::throw; |
| 6 | +use crate::types::tuple::Tuple; |
| 7 | +use crate::types::value::DataValue; |
| 8 | +use ahash::HashMap; |
| 9 | +use fixedbitset::FixedBitSet; |
| 10 | + |
| 11 | +pub(crate) struct FullJoinState { |
| 12 | + pub(crate) left_schema_len: usize, |
| 13 | + pub(crate) right_schema_len: usize, |
| 14 | + pub(crate) bits: FixedBitSet, |
| 15 | +} |
| 16 | + |
| 17 | +impl<'a> JoinProbeState<'a> for FullJoinState { |
| 18 | + fn probe( |
| 19 | + &mut self, |
| 20 | + probe_args: ProbeArgs<'a>, |
| 21 | + filter_args: Option<&'a FilterArgs>, |
| 22 | + ) -> Executor<'a> { |
| 23 | + let left_schema_len = self.left_schema_len; |
| 24 | + let bits_ptr: *mut FixedBitSet = &mut self.bits; |
| 25 | + |
| 26 | + Box::new( |
| 27 | + #[coroutine] |
| 28 | + move || { |
| 29 | + let ProbeArgs { probe_tuple, .. } = probe_args; |
| 30 | + |
| 31 | + if let ProbeArgs { |
| 32 | + is_keys_has_null: false, |
| 33 | + build_state: Some(build_state), |
| 34 | + .. |
| 35 | + } = probe_args |
| 36 | + { |
| 37 | + let mut has_filtered = false; |
| 38 | + for (i, Tuple { values, pk }) in build_state.tuples.iter() { |
| 39 | + let full_values = |
| 40 | + Vec::from_iter(values.iter().chain(probe_tuple.values.iter()).cloned()); |
| 41 | + |
| 42 | + match &filter_args { |
| 43 | + None => (), |
| 44 | + Some(filter_args) => { |
| 45 | + if !throw!(filter(&full_values, filter_args)) { |
| 46 | + has_filtered = true; |
| 47 | + unsafe { |
| 48 | + (*bits_ptr).set(*i, true); |
| 49 | + } |
| 50 | + yield Ok(Self::full_right_row(left_schema_len, &probe_tuple)); |
| 51 | + continue; |
| 52 | + } |
| 53 | + } |
| 54 | + } |
| 55 | + yield Ok(Tuple::new(pk.clone(), full_values)); |
| 56 | + } |
| 57 | + build_state.is_used = !has_filtered; |
| 58 | + build_state.has_filted = has_filtered; |
| 59 | + return; |
| 60 | + } |
| 61 | + |
| 62 | + yield Ok(Self::full_right_row(left_schema_len, &probe_tuple)); |
| 63 | + }, |
| 64 | + ) |
| 65 | + } |
| 66 | + |
| 67 | + fn left_drop( |
| 68 | + &mut self, |
| 69 | + _build_map: HashMap<BumpVec<'a, DataValue>, BuildState>, |
| 70 | + _filter_args: Option<&'a FilterArgs>, |
| 71 | + ) -> Option<Executor<'a>> { |
| 72 | + let full_schema_len = self.right_schema_len + self.left_schema_len; |
| 73 | + let bits_ptr: *mut FixedBitSet = &mut self.bits; |
| 74 | + |
| 75 | + Some(Box::new( |
| 76 | + #[coroutine] |
| 77 | + move || { |
| 78 | + for (_, state) in _build_map { |
| 79 | + if state.is_used { |
| 80 | + continue; |
| 81 | + } |
| 82 | + for (i, mut left_tuple) in state.tuples { |
| 83 | + unsafe { |
| 84 | + if !(*bits_ptr).contains(i) && state.has_filted { |
| 85 | + continue; |
| 86 | + } |
| 87 | + } |
| 88 | + left_tuple.values.resize(full_schema_len, DataValue::Null); |
| 89 | + yield Ok(left_tuple); |
| 90 | + } |
| 91 | + } |
| 92 | + }, |
| 93 | + )) |
| 94 | + } |
| 95 | +} |
| 96 | + |
| 97 | +impl FullJoinState { |
| 98 | + pub(crate) fn full_right_row(left_schema_len: usize, probe_tuple: &Tuple) -> Tuple { |
| 99 | + let full_values = Vec::from_iter( |
| 100 | + (0..left_schema_len) |
| 101 | + .map(|_| DataValue::Null) |
| 102 | + .chain(probe_tuple.values.iter().cloned()), |
| 103 | + ); |
| 104 | + |
| 105 | + Tuple::new(probe_tuple.pk.clone(), full_values) |
| 106 | + } |
| 107 | +} |
0 commit comments