Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 61 additions & 28 deletions velox/exec/fuzzer/JoinFuzzer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@
#include "velox/exec/tests/utils/AssertQueryBuilder.h"
#include "velox/exec/tests/utils/TempDirectoryPath.h"
#include "velox/vector/fuzzer/VectorFuzzer.h"
#include "velox/exec/PlanNodeStats.h"
#include "velox/experimental/cudf/exec/ToCudf.h"
// ./velox/exec/fuzzer/velox_join_fuzzer --enable_spill=false --v=1 --batch_size=100 --num_batches=1 --steps=10 --seed=2
// ./velox/exec/fuzzer/velox_join_fuzzer --enable_spill=false --v=1 --batch_size=100 --num_batches=10 --steps=1 --seed=2

DEFINE_int32(steps, 10, "Number of plans to generate and test.");

Expand Down Expand Up @@ -72,6 +76,21 @@ std::string makePercentageString(size_t value, size_t total) {
return fmt::format("{} ({:.2f}%)", value, (double)value / total * 100);
}

static std::vector<TypePtr> kScalarTypes{
BOOLEAN(),
TINYINT(),
SMALLINT(),
INTEGER(),
BIGINT(),
REAL(),
DOUBLE(),
VARCHAR(),
// VARBINARY(),
// TIMESTAMP(),
// DATE(),
// INTERVAL_DAY_TIME(),
};

class JoinFuzzer {
public:
JoinFuzzer(
Expand Down Expand Up @@ -216,6 +235,8 @@ JoinFuzzer::JoinFuzzer(
dwrf::registerDwrfReaderFactory();
dwrf::registerDwrfWriterFactory();

// print seed
std::cout << "Seed: " << initialSeed << std::endl;
seed(initialSeed);
}

Expand Down Expand Up @@ -249,7 +270,8 @@ std::vector<TypePtr> JoinFuzzer::generateJoinKeyTypes(int32_t numKeys) {
for (auto i = 0; i < numKeys; ++i) {
// Pick random scalar type.
types.push_back(vectorFuzzer_.randType(
referenceQueryRunner_->supportedScalarTypes(), /*maxDepth=*/0));
// referenceQueryRunner_->supportedScalarTypes(), /*maxDepth=*/0));
kScalarTypes, /*maxDepth=*/0));
}
return types;
}
Expand All @@ -273,7 +295,8 @@ std::vector<RowVectorPtr> JoinFuzzer::generateProbeInput(
for (auto i = 0; i < numPayload; ++i) {
names.push_back(fmt::format("tp{}", i + keyNames.size()));
types.push_back(vectorFuzzer_.randType(
referenceQueryRunner_->supportedScalarTypes(), /*maxDepth=*/2));
// referenceQueryRunner_->supportedScalarTypes(), /*maxDepth=*/2));
kScalarTypes, /*maxDepth=*/0));
}

const auto inputType = ROW(std::move(names), std::move(types));
Expand Down Expand Up @@ -307,7 +330,8 @@ std::vector<RowVectorPtr> JoinFuzzer::generateBuildInput(
for (auto i = 0; i < numPayload; ++i) {
names.push_back(fmt::format("bp{}", i + buildKeys.size()));
types.push_back(vectorFuzzer_.randType(
referenceQueryRunner_->supportedScalarTypes(), /*maxDepth=*/2));
// referenceQueryRunner_->supportedScalarTypes(), /*maxDepth=*/2));
kScalarTypes, /*maxDepth=*/0));
}

const auto rowType = ROW(std::move(names), std::move(types));
Expand Down Expand Up @@ -369,6 +393,12 @@ RowVectorPtr JoinFuzzer::execute(
<< ": " << std::endl
<< plan.plan->toString(true, true);

// Print the plan for debugging purposes
std::stringstream planStream;
planStream << "Plan #" << ":\n";
planStream << plan.plan->toString(true, true);
std::cout << planStream.str() << std::endl;

test::AssertQueryBuilder builder(plan.plan);
for (const auto& [planNodeId, nodeSplits] : plan.splits) {
builder.splits(planNodeId, nodeSplits);
Expand Down Expand Up @@ -400,8 +430,9 @@ RowVectorPtr JoinFuzzer::execute(

TestScopedSpillInjection scopedSpillInjection(spillPct);
RowVectorPtr result;
TaskStats stats;
try {
result = builder.maxDrivers(2).copyResults(pool_.get());
std::tie(result, stats) = builder.maxDrivers(2).copyResultsWithStats(pool_.get());
} catch (VeloxRuntimeError& e) {
if (FLAGS_enable_oom_injection &&
e.errorCode() == facebook::velox::error_code::kMemCapExceeded &&
Expand All @@ -421,6 +452,8 @@ RowVectorPtr JoinFuzzer::execute(
// avoid the potential interference of the background activities across query
// executions.
test::waitForAllTasksToBeDeleted();
std::cout << exec::printPlanWithStats(*plan.plan, stats, true)
<< std::endl;
return result;
}

Expand Down Expand Up @@ -521,13 +554,13 @@ RowVectorPtr JoinFuzzer::testCrossProduct(

std::vector<JoinMaker::PlanWithSplits> altPlans;
if (joinMaker.supportsTableScan()) {
altPlans.push_back(joinMaker.makeNestedLoopJoinWithTableScan(
JoinMaker::JoinOrder::NATURAL));
// altPlans.push_back(joinMaker.makeNestedLoopJoinWithTableScan(
// JoinMaker::JoinOrder::NATURAL));
}

if (joinMaker.supportsFlippingNestedLoopJoin()) {
altPlans.push_back(
joinMaker.makeNestedLoopJoin(inputType, JoinMaker::JoinOrder::FLIPPED));
// altPlans.push_back(
// joinMaker.makeNestedLoopJoin(inputType, JoinMaker::JoinOrder::FLIPPED));
}

for (const auto& altPlan : altPlans) {
Expand Down Expand Up @@ -568,17 +601,17 @@ void addPlansForInputType(
plans.push_back(
joinMaker.makeMergeJoin(inputType, JoinMaker::JoinOrder::NATURAL));
if (joinMaker.supportsFlippingMergeJoin()) {
plans.push_back(
joinMaker.makeMergeJoin(inputType, JoinMaker::JoinOrder::FLIPPED));
// plans.push_back(
// joinMaker.makeMergeJoin(inputType, JoinMaker::JoinOrder::FLIPPED));
}
}

if (joinMaker.supportsNestedLoopJoin()) {
plans.push_back(
joinMaker.makeNestedLoopJoin(inputType, JoinMaker::JoinOrder::NATURAL));
// plans.push_back(
// joinMaker.makeNestedLoopJoin(inputType, JoinMaker::JoinOrder::NATURAL));
if (joinMaker.supportsFlippingNestedLoopJoin()) {
plans.push_back(joinMaker.makeNestedLoopJoin(
inputType, JoinMaker::JoinOrder::FLIPPED));
// plans.push_back(joinMaker.makeNestedLoopJoin(
// inputType, JoinMaker::JoinOrder::FLIPPED));
}
}
}
Expand Down Expand Up @@ -676,14 +709,14 @@ void JoinFuzzer::verify(core::JoinType joinType) {
"" // It's a cross join, so no filter.
);

auto result = testCrossProduct(
crossJoinMaker,
JoinMaker::InputType::ENCODED,
probeSource,
buildSource);
auto flatResult = testCrossProduct(
crossJoinMaker, JoinMaker::InputType::FLAT, probeSource, buildSource);
test::assertEqualResults({result}, {flatResult});
// auto result = testCrossProduct(
// crossJoinMaker,
// JoinMaker::InputType::ENCODED,
// probeSource,
// buildSource);
// auto flatResult = testCrossProduct(
// crossJoinMaker, JoinMaker::InputType::FLAT, probeSource, buildSource);
// test::assertEqualResults({result}, {flatResult});
}
}

Expand Down Expand Up @@ -756,8 +789,8 @@ void JoinFuzzer::verify(core::JoinType joinType) {
altPlans.push_back(joinMaker.makeHashJoinWithTableScan(
std::nullopt, JoinMaker::JoinOrder::FLIPPED));
// Use grouped execution.
altPlans.push_back(joinMaker.makeHashJoinWithTableScan(
numGroups, JoinMaker::JoinOrder::FLIPPED));
// altPlans.push_back(joinMaker.makeHashJoinWithTableScan(
// numGroups, JoinMaker::JoinOrder::FLIPPED));
}

if (joinMaker.supportsMergeJoin()) {
Expand All @@ -770,11 +803,11 @@ void JoinFuzzer::verify(core::JoinType joinType) {
}

if (joinMaker.supportsNestedLoopJoin()) {
altPlans.push_back(joinMaker.makeNestedLoopJoinWithTableScan(
JoinMaker::JoinOrder::NATURAL));
// altPlans.push_back(joinMaker.makeNestedLoopJoinWithTableScan(
// JoinMaker::JoinOrder::NATURAL));
if (joinMaker.supportsFlippingNestedLoopJoin()) {
altPlans.push_back(joinMaker.makeNestedLoopJoinWithTableScan(
JoinMaker::JoinOrder::FLIPPED));
// altPlans.push_back(joinMaker.makeNestedLoopJoinWithTableScan(
// JoinMaker::JoinOrder::FLIPPED));
}
}
}
Expand Down
5 changes: 5 additions & 0 deletions velox/exec/fuzzer/JoinFuzzerRunner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "velox/exec/fuzzer/FuzzerUtil.h"
#include "velox/exec/fuzzer/JoinFuzzer.h"
#include "velox/exec/fuzzer/ReferenceQueryRunner.h"
#include "velox/experimental/cudf/exec/ToCudf.h"
#include "velox/functions/prestosql/registration/RegistrationFunctions.h"
#include "velox/parse/TypeResolver.h"
#include "velox/serializers/CompactRowSerializer.h"
Expand Down Expand Up @@ -116,5 +117,9 @@ int main(int argc, char** argv) {
facebook::velox::serializer::spark::UnsafeRowVectorSerde::
registerNamedVectorSerde();
}
// Register cuDF
facebook::velox::cudf_velox::registerCudf();

joinFuzzer(initialSeed, std::move(referenceQueryRunner));
facebook::velox::cudf_velox::unregisterCudf();
}
14 changes: 7 additions & 7 deletions velox/exec/fuzzer/JoinMaker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -391,17 +391,17 @@ JoinMaker::PlanWithSplits JoinMaker::makeHashJoin(
test::PlanBuilder probeSourcePlan;
test::PlanBuilder buildSourcePlan;

if (partitionStrategy == PartitionStrategy::NONE) {
// if (partitionStrategy == PartitionStrategy::NONE) {
probeSourcePlan =
makeJoinSourcePlan(probeSource, inputType, planNodeIdGenerator);
buildSourcePlan =
makeJoinSourcePlan(buildSource, inputType, planNodeIdGenerator);
} else {
probeSourcePlan = makePartitionedJoinSourcePlan(
partitionStrategy, probeSource, inputType, planNodeIdGenerator);
buildSourcePlan = makePartitionedJoinSourcePlan(
partitionStrategy, buildSource, inputType, planNodeIdGenerator);
}
// } else {
// probeSourcePlan = makePartitionedJoinSourcePlan(
// partitionStrategy, probeSource, inputType, planNodeIdGenerator);
// buildSourcePlan = makePartitionedJoinSourcePlan(
// partitionStrategy, buildSource, inputType, planNodeIdGenerator);
// }

return PlanWithSplits(makeHashJoinPlan(
probeSourcePlan,
Expand Down
6 changes: 4 additions & 2 deletions velox/exec/fuzzer/JoinMaker.h
Original file line number Diff line number Diff line change
Expand Up @@ -207,11 +207,13 @@ class JoinMaker {
bool supportsFlippingNestedLoopJoin() const;

bool supportsMergeJoin() const {
return core::MergeJoinNode::isSupported(joinType_);
return false;
// return core::MergeJoinNode::isSupported(joinType_);
}

bool supportsNestedLoopJoin() const {
return core::NestedLoopJoinNode::isSupported(joinType_);
return false;
// return core::NestedLoopJoinNode::isSupported(joinType_);
}

/// Returns whether or not the types of the sources allow them to be read as
Expand Down
8 changes: 8 additions & 0 deletions velox/exec/tests/utils/AssertQueryBuilder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,14 @@ RowVectorPtr AssertQueryBuilder::copyResults(memory::MemoryPool* pool) {
return copyResults(pool, unused);
}

std::pair<RowVectorPtr, TaskStats> AssertQueryBuilder::copyResultsWithStats(memory::MemoryPool* pool) {
std::shared_ptr<Task> unused;
auto result = copyResults(pool, unused);
auto stats = unused->taskStats();
return std::make_pair(result, stats);
}


RowVectorPtr AssertQueryBuilder::copyResults(
memory::MemoryPool* pool,
std::shared_ptr<Task>& task) {
Expand Down
2 changes: 2 additions & 0 deletions velox/exec/tests/utils/AssertQueryBuilder.h
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,8 @@ class AssertQueryBuilder {
/// query returns empty result.
RowVectorPtr copyResults(memory::MemoryPool* pool);

std::pair<RowVectorPtr, TaskStats> copyResultsWithStats(memory::MemoryPool* pool);

/// Similar to above method and also returns the task.
RowVectorPtr copyResults(
memory::MemoryPool* pool,
Expand Down
Loading
Loading