Skip to content
1 change: 1 addition & 0 deletions src/Interpreters/ClusterProxy/executeQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,7 @@ void executeQuery(
not_optimized_cluster->getName());

read_from_remote->setStepDescription("Read from remote replica");
read_from_remote->setIsRemoteFunction(is_remote_function);
plan->addStep(std::move(read_from_remote));
plan->addInterpreterContext(new_context);
plans.emplace_back(std::move(plan));
Expand Down
5 changes: 4 additions & 1 deletion src/Interpreters/Context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2878,8 +2878,11 @@ void Context::setCurrentQueryId(const String & query_id)

client_info.current_query_id = query_id_to_set;

if (client_info.query_kind == ClientInfo::QueryKind::INITIAL_QUERY)
if (client_info.query_kind == ClientInfo::QueryKind::INITIAL_QUERY
&& (getApplicationType() != ApplicationType::SERVER || client_info.initial_query_id.empty()))
{
client_info.initial_query_id = client_info.current_query_id;
}
}

void Context::setBackgroundOperationTypeForContext(ClientInfo::BackgroundOperationType background_operation)
Expand Down
19 changes: 19 additions & 0 deletions src/Interpreters/InterpreterSelectQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
#include <Processors/QueryPlan/TotalsHavingStep.h>
#include <Processors/QueryPlan/WindowStep.h>
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
#include <Processors/QueryPlan/ObjectFilterStep.h>
#include <Processors/Sources/NullSource.h>
#include <Processors/Sources/SourceFromSingleChunk.h>
#include <Processors/Transforms/AggregatingTransform.h>
Expand All @@ -83,6 +84,7 @@
#include <Storages/StorageValues.h>
#include <Storages/StorageView.h>
#include <Storages/ReadInOrderOptimizer.h>
#include <Storages/IStorageCluster.h>

#include <Columns/Collator.h>
#include <Columns/ColumnAggregateFunction.h>
Expand Down Expand Up @@ -195,6 +197,7 @@ namespace Setting
extern const SettingsUInt64 max_rows_to_transfer;
extern const SettingsOverflowMode transfer_overflow_mode;
extern const SettingsString implicit_table_at_top_level;
extern const SettingsBool use_hive_partitioning;
}

namespace ServerSetting
Expand Down Expand Up @@ -1972,6 +1975,22 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, std::optional<P

if (expressions.second_stage || from_aggregation_stage)
{
if (settings[Setting::use_hive_partitioning]
&& !expressions.first_stage
&& expressions.hasWhere())
{
if (typeid_cast<ReadFromCluster *>(query_plan.getRootNode()->step.get()))
{
auto object_filter_step = std::make_unique<ObjectFilterStep>(
query_plan.getCurrentHeader(),
expressions.before_where->dag.clone(),
getSelectQuery().where()->getColumnName());

object_filter_step->setStepDescription("WHERE");
query_plan.addStep(std::move(object_filter_step));
}
}

if (from_aggregation_stage)
{
/// No need to aggregate anything, since this was done on remote shards.
Expand Down
26 changes: 26 additions & 0 deletions src/Planner/Planner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
#include <Processors/QueryPlan/WindowStep.h>
#include <Processors/QueryPlan/ReadNothingStep.h>
#include <Processors/QueryPlan/ReadFromRecursiveCTEStep.h>
#include <Processors/QueryPlan/ObjectFilterStep.h>
#include <QueryPipeline/QueryPipelineBuilder.h>

#include <Interpreters/Context.h>
Expand All @@ -52,6 +53,7 @@
#include <Storages/StorageDistributed.h>
#include <Storages/StorageDummy.h>
#include <Storages/StorageMerge.h>
#include <Storages/IStorageCluster.h>

#include <AggregateFunctions/IAggregateFunction.h>

Expand Down Expand Up @@ -143,6 +145,7 @@ namespace Setting
extern const SettingsUInt64 max_rows_to_transfer;
extern const SettingsOverflowMode transfer_overflow_mode;
extern const SettingsBool enable_parallel_blocks_marshalling;
extern const SettingsBool use_hive_partitioning;
}

namespace ServerSetting
Expand Down Expand Up @@ -452,6 +455,19 @@ void addFilterStep(
query_plan.addStep(std::move(where_step));
}

void addObjectFilterStep(QueryPlan & query_plan,
FilterAnalysisResult & filter_analysis_result,
const std::string & step_description)
{
auto actions = std::move(filter_analysis_result.filter_actions->dag);

auto where_step = std::make_unique<ObjectFilterStep>(query_plan.getCurrentHeader(),
std::move(actions),
filter_analysis_result.filter_column_name);
where_step->setStepDescription(step_description);
query_plan.addStep(std::move(where_step));
}

Aggregator::Params getAggregatorParams(const PlannerContextPtr & planner_context,
const AggregationAnalysisResult & aggregation_analysis_result,
const QueryAnalysisResult & query_analysis_result,
Expand Down Expand Up @@ -1754,6 +1770,16 @@ void Planner::buildPlanForQueryNode()

if (query_processing_info.isSecondStage() || query_processing_info.isFromAggregationState())
{
if (settings[Setting::use_hive_partitioning]
&& !query_processing_info.isFirstStage()
&& expression_analysis_result.hasWhere())
{
if (typeid_cast<ReadFromCluster *>(query_plan.getRootNode()->step.get()))
{
addObjectFilterStep(query_plan, expression_analysis_result.getWhere(), "WHERE");
}
}

if (query_processing_info.isFromAggregationState())
{
/// Aggregation was performed on remote shards
Expand Down
63 changes: 63 additions & 0 deletions src/Processors/QueryPlan/ObjectFilterStep.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
#include <Processors/QueryPlan/ObjectFilterStep.h>
#include <Processors/QueryPlan/QueryPlanStepRegistry.h>
#include <Processors/QueryPlan/Serialization.h>
#include <Processors/Transforms/FilterTransform.h>
#include <IO/Operators.h>

#include <memory>

namespace DB
{

namespace ErrorCodes
{
extern const int INCORRECT_DATA;
}

ObjectFilterStep::ObjectFilterStep(
const Header & input_header_,
ActionsDAG actions_dag_,
String filter_column_name_)
: actions_dag(std::move(actions_dag_))
, filter_column_name(std::move(filter_column_name_))
{
input_headers.emplace_back(input_header_);
output_header = input_headers.front();
}

QueryPipelineBuilderPtr ObjectFilterStep::updatePipeline(QueryPipelineBuilders pipelines, const BuildQueryPipelineSettings & /* settings */)
{
return std::move(pipelines.front());
}

void ObjectFilterStep::updateOutputHeader()
{
output_header = input_headers.front();
}

void ObjectFilterStep::serialize(Serialization & ctx) const
{
writeStringBinary(filter_column_name, ctx.out);

actions_dag.serialize(ctx.out, ctx.registry);
}

std::unique_ptr<IQueryPlanStep> ObjectFilterStep::deserialize(Deserialization & ctx)
{
if (ctx.input_headers.size() != 1)
throw Exception(ErrorCodes::INCORRECT_DATA, "ObjectFilterStep must have one input stream");

String filter_column_name;
readStringBinary(filter_column_name, ctx.in);

ActionsDAG actions_dag = ActionsDAG::deserialize(ctx.in, ctx.registry, ctx.context);

return std::make_unique<ObjectFilterStep>(ctx.input_headers.front(), std::move(actions_dag), std::move(filter_column_name));
}

void registerObjectFilterStep(QueryPlanStepRegistry & registry)
{
registry.registerStep("ObjectFilter", ObjectFilterStep::deserialize);
}

}
35 changes: 35 additions & 0 deletions src/Processors/QueryPlan/ObjectFilterStep.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
#pragma once
#include <Processors/QueryPlan/IQueryPlanStep.h>
#include <Interpreters/ActionsDAG.h>

namespace DB
{

/// Implements WHERE operation.
class ObjectFilterStep : public IQueryPlanStep
{
public:
ObjectFilterStep(
const Header & input_header_,
ActionsDAG actions_dag_,
String filter_column_name_);

String getName() const override { return "ObjectFilter"; }
QueryPipelineBuilderPtr updatePipeline(QueryPipelineBuilders pipelines, const BuildQueryPipelineSettings & settings) override;

const ActionsDAG & getExpression() const { return actions_dag; }
ActionsDAG & getExpression() { return actions_dag; }
const String & getFilterColumnName() const { return filter_column_name; }

void serialize(Serialization & ctx) const override;

static std::unique_ptr<IQueryPlanStep> deserialize(Deserialization & ctx);

private:
void updateOutputHeader() override;

ActionsDAG actions_dag;
String filter_column_name;
};

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <Processors/QueryPlan/FilterStep.h>
#include <Processors/QueryPlan/LimitStep.h>
#include <Processors/QueryPlan/SourceStepWithFilter.h>
#include <Processors/QueryPlan/ObjectFilterStep.h>

namespace DB::QueryPlanOptimizations
{
Expand Down Expand Up @@ -41,6 +42,10 @@ void optimizePrimaryKeyConditionAndLimit(const Stack & stack)
/// So this is likely not needed.
continue;
}
else if (auto * object_filter_step = typeid_cast<ObjectFilterStep *>(iter->node->step.get()))
{
source_step_with_filter->addFilter(object_filter_step->getExpression().clone(), object_filter_step->getFilterColumnName());
}
else
{
break;
Expand Down
2 changes: 2 additions & 0 deletions src/Processors/QueryPlan/QueryPlanStepRegistry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ void registerFilterStep(QueryPlanStepRegistry & registry);
void registerTotalsHavingStep(QueryPlanStepRegistry & registry);
void registerExtremesStep(QueryPlanStepRegistry & registry);
void registerJoinStep(QueryPlanStepRegistry & registry);
void registerObjectFilterStep(QueryPlanStepRegistry & registry);

void registerReadFromTableStep(QueryPlanStepRegistry & registry);
void registerReadFromTableFunctionStep(QueryPlanStepRegistry & registry);
Expand All @@ -73,6 +74,7 @@ void QueryPlanStepRegistry::registerPlanSteps()

registerReadFromTableStep(registry);
registerReadFromTableFunctionStep(registry);
registerObjectFilterStep(registry);
}

}
9 changes: 8 additions & 1 deletion src/Processors/QueryPlan/ReadFromRemote.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,8 @@ void ReadFromRemote::addLazyPipe(
my_stage = stage, my_storage = storage,
add_agg_info, add_totals, add_extremes, async_read, async_query_sending,
query_tree = shard.query_tree, planner_context = shard.planner_context,
pushed_down_filters, parallel_marshalling_threads]() mutable
pushed_down_filters, parallel_marshalling_threads,
my_is_remote_function = is_remote_function]() mutable
-> QueryPipelineBuilder
{
auto current_settings = my_context->getSettingsRef();
Expand Down Expand Up @@ -597,6 +598,8 @@ void ReadFromRemote::addLazyPipe(
{DataTypeUInt32().createColumnConst(1, my_shard.shard_info.shard_num), std::make_shared<DataTypeUInt32>(), "_shard_num"}};
auto remote_query_executor = std::make_shared<RemoteQueryExecutor>(
std::move(connections), query_string, header, my_context, my_throttler, my_scalars, my_external_tables, stage_to_use, my_shard.query_plan);
remote_query_executor->setRemoteFunction(my_is_remote_function);
remote_query_executor->setShardCount(my_shard_count);

auto pipe = createRemoteSourcePipe(
remote_query_executor, add_agg_info, add_totals, add_extremes, async_read, async_query_sending, parallel_marshalling_threads);
Expand Down Expand Up @@ -687,6 +690,8 @@ void ReadFromRemote::addPipe(
priority_func);
remote_query_executor->setLogger(log);
remote_query_executor->setPoolMode(PoolMode::GET_ONE);
remote_query_executor->setRemoteFunction(is_remote_function);
remote_query_executor->setShardCount(shard_count);

if (!table_func_ptr)
remote_query_executor->setMainTable(shard.main_table ? shard.main_table : main_table);
Expand All @@ -707,6 +712,8 @@ void ReadFromRemote::addPipe(
auto remote_query_executor = std::make_shared<RemoteQueryExecutor>(
shard.shard_info.pool, query_string, shard.header, context, throttler, scalars, external_tables, stage_to_use, shard.query_plan);
remote_query_executor->setLogger(log);
remote_query_executor->setRemoteFunction(is_remote_function);
remote_query_executor->setShardCount(shard_count);

if (context->canUseTaskBasedParallelReplicas() || parallel_replicas_disabled)
{
Expand Down
2 changes: 2 additions & 0 deletions src/Processors/QueryPlan/ReadFromRemote.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class ReadFromRemote final : public SourceStepWithFilterBase

void enableMemoryBoundMerging();
void enforceAggregationInOrder(const SortDescription & sort_description);
void setIsRemoteFunction(bool is_remote_function_ = true) { is_remote_function = is_remote_function_; }

bool hasSerializedPlan() const;

Expand All @@ -63,6 +64,7 @@ class ReadFromRemote final : public SourceStepWithFilterBase
UInt32 shard_count;
const String cluster_name;
std::optional<GetPriorityForLoadBalancing> priority_func_factory;
bool is_remote_function = false;

Pipes addPipes(const ClusterProxy::SelectStreamFactory::Shards & used_shards, const Header & out_header);

Expand Down
11 changes: 10 additions & 1 deletion src/QueryPipeline/RemoteQueryExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,16 @@ void RemoteQueryExecutor::sendQueryUnlocked(ClientInfo::QueryKind query_kind, As

auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(settings);
ClientInfo modified_client_info = context->getClientInfo();
modified_client_info.query_kind = query_kind;

/// Doesn't support now "remote('1.1.1.{1,2}')""
if (is_remote_function && (shard_count == 1))
{
modified_client_info.setInitialQuery();
modified_client_info.client_name = "ClickHouse server";
modified_client_info.interface = ClientInfo::Interface::TCP;
}
else
modified_client_info.query_kind = query_kind;

if (!duplicated_part_uuids.empty())
connections->sendIgnoredPartUUIDs(duplicated_part_uuids);
Expand Down
7 changes: 7 additions & 0 deletions src/QueryPipeline/RemoteQueryExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,10 @@ class RemoteQueryExecutor

void setLogger(LoggerPtr logger) { log = logger; }

void setRemoteFunction(bool is_remote_function_ = true) { is_remote_function = is_remote_function_; }

void setShardCount(UInt32 shard_count_) { shard_count = shard_count_; }

const Block & getHeader() const { return header; }

IConnections & getConnections() { return *connections; }
Expand Down Expand Up @@ -312,6 +316,9 @@ class RemoteQueryExecutor
bool packet_in_progress = false;
#endif

bool is_remote_function = false;
UInt32 shard_count = 0;

/// Parts uuids, collected from remote replicas
std::vector<UUID> duplicated_part_uuids;

Expand Down
Loading
Loading