|
| 1 | +// Licensed to the Apache Software Foundation (ASF) under one |
| 2 | +// or more contributor license agreements. See the NOTICE file |
| 3 | +// distributed with this work for additional information |
| 4 | +// regarding copyright ownership. The ASF licenses this file |
| 5 | +// to you under the Apache License, Version 2.0 (the |
| 6 | +// "License"); you may not use this file except in compliance |
| 7 | +// with the License. You may obtain a copy of the License at |
| 8 | +// |
| 9 | +// http://www.apache.org/licenses/LICENSE-2.0 |
| 10 | +// |
| 11 | +// Unless required by applicable law or agreed to in writing, |
| 12 | +// software distributed under the License is distributed on an |
| 13 | +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| 14 | +// KIND, either express or implied. See the License for the |
| 15 | +// specific language governing permissions and limitations |
| 16 | +// under the License. |
| 17 | + |
| 18 | +#pragma once |
| 19 | + |
| 20 | +#include <memory> |
| 21 | +#include <unordered_map> |
| 22 | + |
| 23 | +#include "arrow/compute/exec.h" |
| 24 | +#include "arrow/compute/exec/schema_util.h" |
| 25 | +#include "arrow/compute/kernels/row_encoder.h" |
| 26 | +#include "arrow/result.h" |
| 27 | +#include "arrow/status.h" |
| 28 | +#include "arrow/type.h" |
| 29 | + |
| 30 | +// This file contains hash join logic related to handling of dictionary encoded key |
| 31 | +// columns. |
| 32 | +// |
| 33 | +// A key column from probe side of the join can be matched against a key column from build |
| 34 | +// side of the join, as long as the underlying value types are equal. That means that: |
| 35 | +// - both scalars and arrays can be used and even mixed in the same column |
| 36 | +// - dictionary column can be matched against non-dictionary column if underlying value |
| 37 | +// types are equal |
| 38 | +// - dictionary column can be matched against dictionary column with a different index |
| 39 | +// type, and potentially using a different dictionary, if underlying value types are equal |
| 40 | +// |
| 41 | +// We currently require in hash join that for all dictionary encoded columns, the same |
| 42 | +// dictionary is used in all input exec batches. |
| 43 | +// |
| 44 | +// In order to allow matching columns with different dictionaries, different dictionary |
| 45 | +// index types, and dictionary key against non-dictionary key, internally comparisons will |
| 46 | +// be evaluated after remapping values on both sides of the join to a common |
| 47 | +// representation (which will be called "unified representation"). This common |
| 48 | +// representation is a column of int32() type (not a dictionary column). It represents an |
| 49 | +// index in the unified dictionary computed for the (only) dictionary present on build |
| 50 | +// side (an empty dictionary is still created for an empty build side). Null value is |
| 51 | +// always represented in this common representation as null int32 value, unified |
| 52 | +// dictionary will never contain a null value (so there is no ambiguity of representing |
| 53 | +// nulls as either index to a null entry in the dictionary or null index). |
| 54 | +// |
| 55 | +// Unified dictionary represents values present on build side. There may be values on |
| 56 | +// probe side that are not present in it. All such values, that are not null, are mapped |
| 57 | +// in the common representation to a special constant kMissingValueId. |
| 58 | +// |
| 59 | + |
| 60 | +namespace arrow { |
| 61 | +namespace compute { |
| 62 | + |
| 63 | +using internal::RowEncoder; |
| 64 | + |
| 65 | +/// Helper class with operations that are stateless and common to processing of dictionary |
| 66 | +/// keys on both build and probe side. |
| 67 | +class HashJoinDictUtil { |
| 68 | + public: |
| 69 | + // Null values in unified representation are always represented as null that has |
| 70 | + // corresponding integer set to this constant |
| 71 | + static constexpr int32_t kNullId = 0; |
| 72 | + // Constant representing a value, that is not null, missing on the build side, in |
| 73 | + // unified representation. |
| 74 | + static constexpr int32_t kMissingValueId = -1; |
| 75 | + |
| 76 | + // Check if data types of corresponding pair of key column on build and probe side are |
| 77 | + // compatible |
| 78 | + static bool KeyDataTypesValid(const std::shared_ptr<DataType>& probe_data_type, |
| 79 | + const std::shared_ptr<DataType>& build_data_type); |
| 80 | + |
| 81 | + // Input must be dictionary array or dictionary scalar. |
| 82 | + // A precomputed and provided here lookup table in the form of int32() array will be |
| 83 | + // used to remap input indices to unified representation. |
| 84 | + // |
| 85 | + static Result<std::shared_ptr<ArrayData>> IndexRemapUsingLUT( |
| 86 | + ExecContext* ctx, const Datum& indices, int64_t batch_length, |
| 87 | + const std::shared_ptr<ArrayData>& map_array, |
| 88 | + const std::shared_ptr<DataType>& data_type); |
| 89 | + |
| 90 | + // Return int32() array that contains indices of input dictionary array or scalar after |
| 91 | + // type casting. |
| 92 | + static Result<std::shared_ptr<ArrayData>> ConvertToInt32( |
| 93 | + const std::shared_ptr<DataType>& from_type, const Datum& input, |
| 94 | + int64_t batch_length, ExecContext* ctx); |
| 95 | + |
| 96 | + // Return an array that contains elements of input int32() array after casting to a |
| 97 | + // given integer type. This is used for mapping unified representation stored in the |
| 98 | + // hash table on build side back to original input data type of hash join, when |
| 99 | + // outputting hash join results to parent exec node. |
| 100 | + // |
| 101 | + static Result<std::shared_ptr<ArrayData>> ConvertFromInt32( |
| 102 | + const std::shared_ptr<DataType>& to_type, const Datum& input, int64_t batch_length, |
| 103 | + ExecContext* ctx); |
| 104 | + |
| 105 | + // Return dictionary referenced in either dictionary array or dictionary scalar |
| 106 | + static std::shared_ptr<Array> ExtractDictionary(const Datum& data); |
| 107 | +}; |
| 108 | + |
| 109 | +/// Implements processing of dictionary arrays/scalars in key columns on the build side of |
| 110 | +/// a hash join. |
| 111 | +/// Each instance of this class corresponds to a single column and stores and |
| 112 | +/// processes only the information related to that column. |
| 113 | +/// Const methods are thread-safe, non-const methods are not (the caller must make sure |
| 114 | +/// that only one thread at any time will access them). |
| 115 | +/// |
| 116 | +class HashJoinDictBuild { |
| 117 | + public: |
| 118 | + // Returns true if the key column (described in input by its data type) requires any |
| 119 | + // pre- or post-processing related to handling dictionaries. |
| 120 | + // |
| 121 | + static bool KeyNeedsProcessing(const std::shared_ptr<DataType>& build_data_type) { |
| 122 | + return (build_data_type->id() == Type::DICTIONARY); |
| 123 | + } |
| 124 | + |
| 125 | + // Data type of unified representation |
| 126 | + static std::shared_ptr<DataType> DataTypeAfterRemapping() { return int32(); } |
| 127 | + |
| 128 | + // Should be called only once in hash join, before processing any build or probe |
| 129 | + // batches. |
| 130 | + // |
| 131 | + // Takes a pointer to the dictionary for a corresponding key column on the build side as |
| 132 | + // an input. If the build side is empty, it still needs to be called, but with |
| 133 | + // dictionary pointer set to null. |
| 134 | + // |
| 135 | + // Currently it is required that all input batches on build side share the same |
| 136 | + // dictionary. For each input batch during its pre-processing, dictionary will be |
| 137 | + // checked and error will be returned if it is different then the one provided in the |
| 138 | + // call to this method. |
| 139 | + // |
| 140 | + // Unifies the dictionary. The order of the values is still preserved. |
| 141 | + // Null and duplicate entries are removed. If the dictionary is already unified, its |
| 142 | + // copy will be produced and stored within this class. |
| 143 | + // |
| 144 | + // Prepares the mapping from ids within original dictionary to the ids in the resulting |
| 145 | + // dictionary. This is used later on to pre-process (map to unified representation) key |
| 146 | + // column on build side. |
| 147 | + // |
| 148 | + // Prepares the reverse mapping (in the form of hash table) from values to the ids in |
| 149 | + // the resulting dictionary. This will be used later on to pre-process (map to unified |
| 150 | + // representation) key column on probe side. Values on probe side that are not present |
| 151 | + // in the original dictionary will be mapped to a special constant kMissingValueId. The |
| 152 | + // exception is made for nulls, which get always mapped to nulls (both when null is |
| 153 | + // represented as a dictionary id pointing to a null and a null dictionary id). |
| 154 | + // |
| 155 | + Status Init(ExecContext* ctx, std::shared_ptr<Array> dictionary, |
| 156 | + std::shared_ptr<DataType> index_type, std::shared_ptr<DataType> value_type); |
| 157 | + |
| 158 | + // Remap array or scalar values into unified representation (array of int32()). |
| 159 | + // Outputs kMissingValueId if input value is not found in the unified dictionary. |
| 160 | + // Outputs null for null input value (with corresponding data set to kNullId). |
| 161 | + // |
| 162 | + Result<std::shared_ptr<ArrayData>> RemapInputValues(ExecContext* ctx, |
| 163 | + const Datum& values, |
| 164 | + int64_t batch_length) const; |
| 165 | + |
| 166 | + // Remap dictionary array or dictionary scalar on build side to unified representation. |
| 167 | + // Dictionary referenced in the input must match the dictionary that was |
| 168 | + // given during initialization. |
| 169 | + // The output is a dictionary array that references unified dictionary. |
| 170 | + // |
| 171 | + Result<std::shared_ptr<ArrayData>> RemapInput( |
| 172 | + ExecContext* ctx, const Datum& indices, int64_t batch_length, |
| 173 | + const std::shared_ptr<DataType>& data_type) const; |
| 174 | + |
| 175 | + // Outputs dictionary array referencing unified dictionary, given an array with 32-bit |
| 176 | + // ids. |
| 177 | + // Used to post-process values looked up in a hash table on build side of the hash join |
| 178 | + // before outputting to the parent exec node. |
| 179 | + // |
| 180 | + Result<std::shared_ptr<ArrayData>> RemapOutput(const ArrayData& indices32Bit, |
| 181 | + ExecContext* ctx) const; |
| 182 | + |
| 183 | + // Release shared pointers and memory |
| 184 | + void CleanUp(); |
| 185 | + |
| 186 | + private: |
| 187 | + // Data type of dictionary ids for the input dictionary on build side |
| 188 | + std::shared_ptr<DataType> index_type_; |
| 189 | + // Data type of values for the input dictionary on build side |
| 190 | + std::shared_ptr<DataType> value_type_; |
| 191 | + // Mapping from (encoded as string) values to the ids in unified dictionary |
| 192 | + std::unordered_map<std::string, int32_t> hash_table_; |
| 193 | + // Mapping from input dictionary ids to unified dictionary ids |
| 194 | + std::shared_ptr<ArrayData> remapped_ids_; |
| 195 | + // Input dictionary |
| 196 | + std::shared_ptr<Array> dictionary_; |
| 197 | + // Unified dictionary |
| 198 | + std::shared_ptr<ArrayData> unified_dictionary_; |
| 199 | +}; |
| 200 | + |
| 201 | +/// Implements processing of dictionary arrays/scalars in key columns on the probe side of |
| 202 | +/// a hash join. |
| 203 | +/// Each instance of this class corresponds to a single column and stores and |
| 204 | +/// processes only the information related to that column. |
| 205 | +/// It is not thread-safe - every participating thread should use its own instance of |
| 206 | +/// this class. |
| 207 | +/// |
| 208 | +class HashJoinDictProbe { |
| 209 | + public: |
| 210 | + static bool KeyNeedsProcessing(const std::shared_ptr<DataType>& probe_data_type, |
| 211 | + const std::shared_ptr<DataType>& build_data_type); |
| 212 | + |
| 213 | + // Data type of the result of remapping input key column. |
| 214 | + // |
| 215 | + // The result of remapping is what is used in hash join for matching keys on build and |
| 216 | + // probe side. The exact data types may be different, as described below, and therefore |
| 217 | + // a common representation is needed for simplifying comparisons of pairs of keys on |
| 218 | + // both sides. |
| 219 | + // |
| 220 | + // We support matching key that is of non-dictionary type with key that is of dictionary |
| 221 | + // type, as long as the underlying value types are equal. We support matching when both |
| 222 | + // keys are of dictionary type, regardless whether underlying dictionary index types are |
| 223 | + // the same or not. |
| 224 | + // |
| 225 | + static std::shared_ptr<DataType> DataTypeAfterRemapping( |
| 226 | + const std::shared_ptr<DataType>& build_data_type); |
| 227 | + |
| 228 | + // Should only be called if KeyNeedsProcessing method returns true for a pair of |
| 229 | + // corresponding key columns from build and probe side. |
| 230 | + // Converts values in order to match the common representation for |
| 231 | + // both build and probe side used in hash table comparison. |
| 232 | + // Supports arrays and scalars as input. |
| 233 | + // Argument opt_build_side should be null if dictionary key on probe side is matched |
| 234 | + // with non-dictionary key on build side. |
| 235 | + // |
| 236 | + Result<std::shared_ptr<ArrayData>> RemapInput( |
| 237 | + const HashJoinDictBuild* opt_build_side, const Datum& data, int64_t batch_length, |
| 238 | + const std::shared_ptr<DataType>& probe_data_type, |
| 239 | + const std::shared_ptr<DataType>& build_data_type, ExecContext* ctx); |
| 240 | + |
| 241 | + void CleanUp(); |
| 242 | + |
| 243 | + private: |
| 244 | + // May be null if probe side key is non-dictionary. Otherwise it is used to verify that |
| 245 | + // only a single dictionary is referenced in exec batch on probe side of hash join. |
| 246 | + std::shared_ptr<Array> dictionary_; |
| 247 | + // Mapping from dictionary on probe side of hash join (if it is used) to unified |
| 248 | + // representation. |
| 249 | + std::shared_ptr<ArrayData> remapped_ids_; |
| 250 | + // Encoder of key columns that uses unified representation instead of original data type |
| 251 | + // for key columns that need to use it (have dictionaries on either side of the join). |
| 252 | + internal::RowEncoder encoder_; |
| 253 | +}; |
| 254 | + |
| 255 | +// Encapsulates dictionary handling logic for build side of hash join. |
| 256 | +// |
| 257 | +class HashJoinDictBuildMulti { |
| 258 | + public: |
| 259 | + Status Init(const SchemaProjectionMaps<HashJoinProjection>& proj_map, |
| 260 | + const ExecBatch* opt_non_empty_batch, ExecContext* ctx); |
| 261 | + static void InitEncoder(const SchemaProjectionMaps<HashJoinProjection>& proj_map, |
| 262 | + RowEncoder* encoder, ExecContext* ctx); |
| 263 | + Status EncodeBatch(size_t thread_index, |
| 264 | + const SchemaProjectionMaps<HashJoinProjection>& proj_map, |
| 265 | + const ExecBatch& batch, RowEncoder* encoder, ExecContext* ctx) const; |
| 266 | + Status PostDecode(const SchemaProjectionMaps<HashJoinProjection>& proj_map, |
| 267 | + ExecBatch* decoded_key_batch, ExecContext* ctx); |
| 268 | + const HashJoinDictBuild& get_dict_build(int icol) const { return remap_imp_[icol]; } |
| 269 | + |
| 270 | + private: |
| 271 | + std::vector<bool> needs_remap_; |
| 272 | + std::vector<HashJoinDictBuild> remap_imp_; |
| 273 | +}; |
| 274 | + |
| 275 | +// Encapsulates dictionary handling logic for probe side of hash join |
| 276 | +// |
| 277 | +class HashJoinDictProbeMulti { |
| 278 | + public: |
| 279 | + void Init(size_t num_threads); |
| 280 | + bool BatchRemapNeeded(size_t thread_index, |
| 281 | + const SchemaProjectionMaps<HashJoinProjection>& proj_map_probe, |
| 282 | + const SchemaProjectionMaps<HashJoinProjection>& proj_map_build, |
| 283 | + ExecContext* ctx); |
| 284 | + Status EncodeBatch(size_t thread_index, |
| 285 | + const SchemaProjectionMaps<HashJoinProjection>& proj_map_probe, |
| 286 | + const SchemaProjectionMaps<HashJoinProjection>& proj_map_build, |
| 287 | + const HashJoinDictBuildMulti& dict_build, const ExecBatch& batch, |
| 288 | + RowEncoder** out_encoder, ExecBatch* opt_out_key_batch, |
| 289 | + ExecContext* ctx); |
| 290 | + |
| 291 | + private: |
| 292 | + void InitLocalStateIfNeeded( |
| 293 | + size_t thread_index, const SchemaProjectionMaps<HashJoinProjection>& proj_map_probe, |
| 294 | + const SchemaProjectionMaps<HashJoinProjection>& proj_map_build, ExecContext* ctx); |
| 295 | + static void InitEncoder(const SchemaProjectionMaps<HashJoinProjection>& proj_map_probe, |
| 296 | + const SchemaProjectionMaps<HashJoinProjection>& proj_map_build, |
| 297 | + RowEncoder* encoder, ExecContext* ctx); |
| 298 | + struct ThreadLocalState { |
| 299 | + bool is_initialized; |
| 300 | + // Whether any key column needs remapping (because of dictionaries used) before doing |
| 301 | + // join hash table lookups |
| 302 | + bool any_needs_remap; |
| 303 | + // Whether each key column needs remapping before doing join hash table lookups |
| 304 | + std::vector<bool> needs_remap; |
| 305 | + std::vector<HashJoinDictProbe> remap_imp; |
| 306 | + // Encoder of key columns that uses unified representation instead of original data |
| 307 | + // type for key columns that need to use it (have dictionaries on either side of the |
| 308 | + // join). |
| 309 | + RowEncoder post_remap_encoder; |
| 310 | + }; |
| 311 | + std::vector<ThreadLocalState> local_states_; |
| 312 | +}; |
| 313 | + |
| 314 | +} // namespace compute |
| 315 | +} // namespace arrow |
0 commit comments