Skip to content

Commit a0bde0a

Browse files
committed
[174]: resolved requests and updated comments
Mostly minor changes. Only major change is replacing the use of hashing32 from key_hash.c with ScalarHelper from hashing.h
1 parent 6a33df4 commit a0bde0a

File tree

1 file changed

+58
-95
lines changed

1 file changed

+58
-95
lines changed

cpp/code/compute_fn.cc

Lines changed: 58 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
// arrow dependencies
1010
#include <arrow/api.h>
1111
#include <arrow/compute/api.h>
12-
#include <arrow/compute/exec/key_hash.h>
12+
#include <arrow/util/hashing.h>
1313

1414
#include "common.h"
1515

@@ -18,48 +18,34 @@
1818
using std::shared_ptr;
1919
using std::vector;
2020

21-
// arrow util types
21+
// >> commonly used arrow types
22+
// |> general programming support
2223
using arrow::Result;
2324
using arrow::Status;
2425
using arrow::Datum;
2526

26-
// arrow data types and helpers
27-
using arrow::UInt32Builder;
28-
using arrow::Int32Builder;
29-
27+
// |> arrow data types and helpers
28+
using arrow::Int64Builder;
3029
using arrow::Array;
3130
using arrow::ArraySpan;
3231

3332

34-
// aliases for types used in `NamedScalarFn`
33+
// >> aliases for types used to define a custom function (e.g. `NamedScalarFn`)
3534
// |> kernel parameters
3635
using arrow::compute::KernelContext;
3736
using arrow::compute::ExecSpan;
3837
using arrow::compute::ExecResult;
3938

40-
// |> other context types
41-
using arrow::compute::ExecContext;
42-
using arrow::compute::LightContext;
43-
44-
// |> common types for compute functions
45-
using arrow::compute::FunctionRegistry;
39+
// |> for defining compute functions and their compute kernels
4640
using arrow::compute::FunctionDoc;
4741
using arrow::compute::InputType;
4842
using arrow::compute::OutputType;
4943
using arrow::compute::Arity;
50-
51-
// |> the "kind" of function we want
5244
using arrow::compute::ScalarFunction;
5345

54-
// |> structs and classes for hashing
55-
using arrow::util::MiniBatch;
56-
using arrow::util::TempVectorStack;
57-
58-
using arrow::compute::KeyColumnArray;
59-
using arrow::compute::Hashing32;
60-
61-
// |> functions used for hashing
62-
using arrow::compute::ColumnArrayFromArrayData;
46+
// |> for adding to a function registry or using `CallFunction`
47+
using arrow::compute::FunctionRegistry;
48+
using arrow::compute::ExecContext;
6349

6450

6551
// ------------------------------
@@ -69,12 +55,13 @@ using arrow::compute::ColumnArrayFromArrayData;
6955
/**
7056
* Create a const instance of `FunctionDoc` that contains 3 attributes:
7157
* 1. Short description
72-
* 2. Long description (limited to 78 characters)
58+
* 2. Long description (can be multiple lines, each limited to 78 characters in width)
7359
* 3. Name of input arguments
7460
*/
7561
const FunctionDoc named_scalar_fn_doc {
76-
"Unary function that calculates a hash for each row of the input"
77-
,"This function uses an xxHash-like algorithm which produces 32-bit hashes."
62+
"Unary function that calculates a hash for each element of the input"
63+
,("This function uses the xxHash algorithm.\n"
64+
"The result contains a 64-bit hash value for each input element.")
7865
,{ "input_array" }
7966
};
8067

@@ -93,64 +80,43 @@ struct NamedScalarFn {
9380

9481
/**
9582
* A kernel implementation that expects a single array as input, and outputs an array of
96-
* uint32 values. We write this implementation knowing what function we want to
83+
* int64 values. We write this implementation knowing what function we want to
9784
* associate it with ("NamedScalarFn"), but that association is made later (see
9885
* `RegisterScalarFnKernels()` below).
9986
*/
10087
static Status
10188
Exec(KernelContext *ctx, const ExecSpan &input_arg, ExecResult *out) {
10289
StartRecipe("DefineAComputeKernel");
10390

104-
if (input_arg.num_values() != 1 or not input_arg[0].is_array()) {
91+
// Validate inputs
92+
if (input_arg.num_values() != 1 or !input_arg[0].is_array()) {
10593
return Status::Invalid("Unsupported argument types or shape");
10694
}
10795

108-
// >> Initialize stack-based memory allocator with an allocator and memory size
109-
TempVectorStack stack_memallocator;
110-
auto input_dtype_width = input_arg[0].type()->bit_width();
111-
if (input_dtype_width > 0) {
112-
ARROW_RETURN_NOT_OK(
113-
stack_memallocator.Init(
114-
ctx->exec_context()->memory_pool()
115-
,input_dtype_width * max_batchsize
116-
)
96+
// The input ArraySpan manages data as 3 buffers; the data buffer has index `1`
97+
constexpr int bufndx_data = 1;
98+
const int64_t *hash_inputs = input_arg[0].array.GetValues<int64_t>(bufndx_data);
99+
const auto input_len = input_arg[0].array.length;
100+
101+
// Allocate an Arrow buffer for output
102+
ARROW_ASSIGN_OR_RAISE(std::unique_ptr<Buffer> hash_buffer,
103+
AllocateBuffer(input_len * sizeof(int64_t)));
104+
105+
// Call hashing function, using both prime multipliers from xxHash
106+
int64_t *hash_results = reinterpret_cast<int64_t*>(hash_buffer->mutable_data());
107+
for (int val_ndx = 0; val_ndx < input_len; ++val_ndx) {
108+
hash_results[val_ndx] = (
109+
ScalarHelper<int64_t, 0>::ComputeHash(hash_inputs[val_ndx])
110+
+ ScalarHelper<int64_t, 1>::ComputeHash(hash_inputs[val_ndx])
117111
);
118112
}
119113

120-
// >> Prepare input data structure for propagation to hash function
121-
// NOTE: "start row index" and "row count" can potentially be options in the future
122-
ArraySpan hash_input = input_arg[0].array;
123-
int64_t hash_startrow = 0;
124-
int64_t hash_rowcount = hash_input.length;
125-
ARROW_ASSIGN_OR_RAISE(
126-
KeyColumnArray input_keycol
127-
,ColumnArrayFromArrayData(hash_input.ToArrayData(), hash_startrow, hash_rowcount)
128-
);
129-
130-
// >> Call hashing function
131-
vector<uint32_t> hash_results;
132-
hash_results.resize(hash_input.length);
133-
134-
LightContext hash_ctx;
135-
hash_ctx.hardware_flags = ctx->exec_context()->cpu_info()->hardware_flags();
136-
hash_ctx.stack = &stack_memallocator;
137-
138-
Hashing32::HashMultiColumn({ input_keycol }, &hash_ctx, hash_results.data());
139-
140-
// >> Prepare results of hash function for kernel output argument
141-
UInt32Builder builder;
142-
builder.Reserve(hash_results.size());
143-
builder.AppendValues(hash_results);
144-
145-
ARROW_ASSIGN_OR_RAISE(auto result_array, builder.Finish());
146-
out->value = result_array->data();
114+
// Use ArrayData (not ArraySpan) for ownership of result buffer
115+
out->value = ArrayData{int64(), input_len, {nullptr, std::move(hash_buffer)}};
147116

148117
EndRecipe("DefineAComputeKernel");
149118
return Status::OK();
150119
}
151-
152-
153-
static constexpr uint32_t max_batchsize = MiniBatch::kMiniBatchLength;
154120
};
155121

156122

@@ -172,25 +138,24 @@ struct NamedScalarFn {
172138
*/
173139
shared_ptr<ScalarFunction>
174140
RegisterScalarFnKernels() {
175-
StartRecipe("AddKernelsToFunction");
141+
StartRecipe("AddKernelToFunction");
176142
// Instantiate a function to be registered
177143
auto fn_named_scalar = std::make_shared<ScalarFunction>(
178144
"named_scalar_fn"
179145
,Arity::Unary()
180146
,std::move(named_scalar_fn_doc)
181147
);
182148

183-
// Associate a kernel implementation with the function using
184-
// `ScalarFunction::AddKernel()`
149+
// Associate a function and kernel using `ScalarFunction::AddKernel()`
185150
DCHECK_OK(
186151
fn_named_scalar->AddKernel(
187-
{ InputType(arrow::int32()) }
188-
,OutputType(arrow::uint32())
152+
{ InputType(arrow::int64()) }
153+
,OutputType(arrow::int64())
189154
,NamedScalarFn::Exec
190155
)
191156
);
157+
EndRecipe("AddKernelToFunction");
192158

193-
EndRecipe("AddKernelsToFunction");
194159
return fn_named_scalar;
195160
}
196161

@@ -209,9 +174,9 @@ RegisterNamedScalarFn(FunctionRegistry *registry) {
209174

210175
// >> Convenience functions
211176
/**
212-
* An optional convenience function to easily invoke our compute function. This executes
213-
* our compute function by invoking `CallFunction` with the name that we used to register
214-
* the function ("named_scalar_fn" in this case).
177+
* An optional, convenient invocation function to easily call our compute function. This
178+
* executes our compute function by invoking `CallFunction` with the name that we used to
179+
* register the function ("named_scalar_fn" in this case).
215180
*/
216181
ARROW_EXPORT
217182
Result<Datum>
@@ -223,9 +188,9 @@ NamedScalarFn(const Datum &input_arg, ExecContext *ctx) {
223188

224189
Result<shared_ptr<Array>>
225190
BuildIntArray() {
226-
vector<int32_t> col_vals { 0, 1, 1, 2, 3, 5, 8, 13, 21, 34 };
191+
vector<int64_t> col_vals { 0, 1, 1, 2, 3, 5, 8, 13, 21, 34 };
227192

228-
Int32Builder builder;
193+
Int64Builder builder;
229194
ARROW_RETURN_NOT_OK(builder.Reserve(col_vals.size()));
230195
ARROW_RETURN_NOT_OK(builder.AppendValues(col_vals));
231196
return builder.Finish();
@@ -235,27 +200,22 @@ BuildIntArray() {
235200
class ComputeFunctionTest : public ::testing::Test {};
236201

237202
TEST(ComputeFunctionTest, TestRegisterAndCallFunction) {
238-
// >> Construct some test data
203+
// >> Register the function first
204+
StartRecipe("RegisterComputeFunction");
205+
auto fn_registry = arrow::compute::GetFunctionRegistry();
206+
RegisterNamedScalarFn(fn_registry);
207+
EndRecipe("RegisterComputeFunction");
208+
209+
// >> Then we can call the function
210+
StartRecipe("InvokeComputeFunction");
239211
auto build_result = BuildIntArray();
240212
if (not build_result.ok()) {
241213
std::cerr << build_result.status().message() << std::endl;
242214
return 1;
243215
}
244216

245-
// >> Peek at the data
246-
auto col_vals = *build_result;
247-
std::cout << col_vals->ToString() << std::endl;
248-
249-
// >> Invoke compute function
250-
StartRecipe("RegisterAndCallComputeFunction");
251-
// |> First, register
252-
auto fn_registry = arrow::compute::GetFunctionRegistry();
253-
RegisterNamedScalarFn(fn_registry);
254-
255-
256-
// |> Then, invoke
257-
Datum col_as_datum { col_vals };
258-
auto fn_result = NamedScalarFn(col_as_datum);
217+
Datum col_data { *build_result };
218+
auto fn_result = NamedScalarFn(col_data);
259219
if (not fn_result.ok()) {
260220
std::cerr << fn_result.status().message() << std::endl;
261221
return 2;
@@ -264,7 +224,10 @@ TEST(ComputeFunctionTest, TestRegisterAndCallFunction) {
264224
auto result_data = fn_result->make_array();
265225
std::cout << "Success:" << std::endl;
266226
std::cout << "\t" << result_data->ToString() << std::endl;
227+
EndRecipe("InvokeComputeFunction");
228+
229+
// If we want to peek at the input data
230+
std::cout << col_data.make_array()->ToString() << std::endl;
267231

268-
EndRecipe("RegisterAndCallComputeFunction");
269232
return 0;
270233
}

0 commit comments

Comments
 (0)