Skip to content

Commit 39ae16c

Browse files
committed
Autodetect table to write in Merge engine
1 parent 7033c57 commit 39ae16c

File tree

5 files changed

+120
-3
lines changed

5 files changed

+120
-3
lines changed

src/Storages/StorageMerge.cpp

Lines changed: 53 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,7 @@ StorageMerge::StorageMerge(
144144
bool database_is_regexp_,
145145
const DBToTableSetMap & source_databases_and_tables_,
146146
const std::optional<String> & table_to_write_,
147+
bool table_to_write_auto_,
147148
ContextPtr context_)
148149
: IStorage(table_id_)
149150
, WithContext(context_->getGlobalContext())
@@ -152,13 +153,15 @@ StorageMerge::StorageMerge(
152153
database_is_regexp_,
153154
source_database_name_or_regexp_, {},
154155
source_databases_and_tables_)
156+
, table_to_write_auto(table_to_write_auto_)
155157
{
156158
StorageInMemoryMetadata storage_metadata;
157159
storage_metadata.setColumns(columns_.empty() ? getColumnsDescriptionFromSourceTables() : columns_);
158160
storage_metadata.setComment(comment);
159161
setInMemoryMetadata(storage_metadata);
160162
setVirtuals(createVirtuals());
161-
setTableToWrite(table_to_write_, source_database_name_or_regexp_, database_is_regexp_);
163+
if (!table_to_write_auto)
164+
setTableToWrite(table_to_write_, source_database_name_or_regexp_, database_is_regexp_);
162165
}
163166

164167
StorageMerge::StorageMerge(
@@ -169,6 +172,7 @@ StorageMerge::StorageMerge(
169172
bool database_is_regexp_,
170173
const String & source_table_regexp_,
171174
const std::optional<String> & table_to_write_,
175+
bool table_to_write_auto_,
172176
ContextPtr context_)
173177
: IStorage(table_id_)
174178
, WithContext(context_->getGlobalContext())
@@ -177,13 +181,15 @@ StorageMerge::StorageMerge(
177181
database_is_regexp_,
178182
source_database_name_or_regexp_,
179183
source_table_regexp_, {})
184+
, table_to_write_auto(table_to_write_auto_)
180185
{
181186
StorageInMemoryMetadata storage_metadata;
182187
storage_metadata.setColumns(columns_.empty() ? getColumnsDescriptionFromSourceTables() : columns_);
183188
storage_metadata.setComment(comment);
184189
setInMemoryMetadata(storage_metadata);
185190
setVirtuals(createVirtuals());
186-
setTableToWrite(table_to_write_, source_database_name_or_regexp_, database_is_regexp_);
191+
if (!table_to_write_auto)
192+
setTableToWrite(table_to_write_, source_database_name_or_regexp_, database_is_regexp_);
187193
}
188194

189195
StorageMerge::DatabaseTablesIterators StorageMerge::getDatabaseIterators(ContextPtr context_) const
@@ -230,6 +236,29 @@ void StorageMerge::forEachTable(F && func) const
230236
});
231237
}
232238

239+
template <typename F>
240+
void StorageMerge::forEachTableName(F && func) const
241+
{
242+
auto database_table_iterators = database_name_or_regexp.getDatabaseIterators(getContext());
243+
244+
for (auto & iterator : database_table_iterators)
245+
{
246+
while (iterator->isValid())
247+
{
248+
const auto & table = iterator->table();
249+
if (table.get() != this)
250+
{
251+
QualifiedTableName table_name;
252+
table_name.database = iterator->databaseName();
253+
table_name.table = iterator->name();
254+
func(table_name);
255+
}
256+
257+
iterator->next();
258+
}
259+
}
260+
}
261+
233262
bool StorageMerge::isRemote() const
234263
{
235264
auto first_remote_table = getFirstTable([](const StoragePtr & table) { return table && table->isRemote(); });
@@ -1702,6 +1731,18 @@ SinkToStoragePtr StorageMerge::write(
17021731
ContextPtr context_,
17031732
bool async_insert)
17041733
{
1734+
if (table_to_write_auto)
1735+
{
1736+
table_to_write = std::nullopt;
1737+
forEachTableName([&](const auto & table_name)
1738+
{
1739+
if (!table_to_write.has_value())
1740+
table_to_write = table_name;
1741+
else if (table_to_write->getFullName() < table_name.getFullName())
1742+
table_to_write = table_name;
1743+
});
1744+
}
1745+
17051746
if (!table_to_write.has_value())
17061747
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method write is not allowed in storage {} without described table to write.", getName());
17071748

@@ -1738,14 +1779,23 @@ void registerStorageMerge(StorageFactory & factory)
17381779
String table_name_regexp = checkAndGetLiteralArgument<String>(engine_args[1], "table_name_regexp");
17391780

17401781
std::optional<String> table_to_write = std::nullopt;
1782+
bool table_to_write_auto = false;
17411783
if (size == 3)
17421784
{
1785+
bool is_identifier = engine_args[2]->as<ASTIdentifier>();
17431786
engine_args[2] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[2], args.getLocalContext());
17441787
table_to_write = checkAndGetLiteralArgument<String>(engine_args[2], "table_to_write");
1788+
if (is_identifier && table_to_write == "auto")
1789+
{
1790+
if (is_regexp)
1791+
throw Exception(ErrorCodes::BAD_ARGUMENTS, "RegExp for database with auto table_to_write is forbidden.");
1792+
table_to_write_auto = true;
1793+
}
17451794
}
17461795

17471796
return std::make_shared<StorageMerge>(
1748-
args.table_id, args.columns, args.comment, source_database_name_or_regexp, is_regexp, table_name_regexp, table_to_write, args.getContext());
1797+
args.table_id, args.columns, args.comment, source_database_name_or_regexp, is_regexp,
1798+
table_name_regexp, table_to_write, table_to_write_auto, args.getContext());
17491799
},
17501800
{
17511801
.supports_schema_inference = true

src/Storages/StorageMerge.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ class StorageMerge final : public IStorage, WithContext
3131
bool database_is_regexp_,
3232
const DBToTableSetMap & source_databases_and_tables_,
3333
const std::optional<String> & table_to_write_,
34+
bool table_to_write_auto_,
3435
ContextPtr context_);
3536

3637
StorageMerge(
@@ -41,6 +42,7 @@ class StorageMerge final : public IStorage, WithContext
4142
bool database_is_regexp_,
4243
const String & source_table_regexp_,
4344
const std::optional<String> & table_to_write_,
45+
bool table_to_write_auto_,
4446
ContextPtr context_);
4547

4648
std::string getName() const override { return "Merge"; }
@@ -124,13 +126,17 @@ class StorageMerge final : public IStorage, WithContext
124126
DatabaseNameOrRegexp database_name_or_regexp;
125127

126128
std::optional<QualifiedTableName> table_to_write;
129+
bool table_to_write_auto = false;
127130

128131
template <typename F>
129132
StoragePtr getFirstTable(F && predicate) const;
130133

131134
template <typename F>
132135
void forEachTable(F && func) const;
133136

137+
template <typename F>
138+
void forEachTableName(F && func) const;
139+
134140
ColumnSizeByName getColumnSizes() const override;
135141

136142
ColumnsDescription getColumnsDescriptionFromSourceTables() const;

src/TableFunctions/TableFunctionMerge.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,7 @@ StoragePtr TableFunctionMerge::executeImpl(const ASTPtr & /*ast_function*/, Cont
184184
database_is_regexp,
185185
getSourceDatabasesAndTables(context),
186186
table_to_write,
187+
false,
187188
context);
188189

189190
res->startup();

tests/queries/0_stateless/03373_write_to_merge_table.reference

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,3 +17,28 @@
1717
2 1
1818
2 2
1919
2 3
20+
1 1
21+
2 1
22+
2 2
23+
2 3
24+
4
25+
2 1
26+
2 2
27+
2 3
28+
3 1
29+
4
30+
2 1
31+
2 2
32+
2 3
33+
3 1
34+
1
35+
3 2
36+
4
37+
2 1
38+
2 2
39+
2 3
40+
3 1
41+
0
42+
2
43+
3 2
44+
3 3

tests/queries/0_stateless/03373_write_to_merge_table.sql

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,14 @@
1+
-- Tags: no-parallel
2+
13
DROP TABLE IF EXISTS test03373_db.test03373_table_1;
24
DROP TABLE IF EXISTS test03373_db.test03373_table_2;
5+
DROP TABLE IF EXISTS test03373_db.test03373_table_3;
6+
DROP TABLE IF EXISTS test03373_db.test03373_table_4;
37
DROP TABLE IF EXISTS test03373_db.test03373_merge_ro;
48
DROP TABLE IF EXISTS test03373_db.test03373_merge_wr_1;
59
DROP TABLE IF EXISTS test03373_db.test03373_merge_wr_2;
610
DROP TABLE IF EXISTS test03373_db.test03373_merge_wr_3;
11+
DROP TABLE IF EXISTS test03373_db.test03373_merge_wr_auto;
712
DROP DATABASE IF EXISTS test03373_db;
813

914
CREATE DATABASE test03373_db;
@@ -17,6 +22,8 @@ CREATE TABLE test03373_db.test03373_merge_wr_1 (key UInt32, value UInt32) ENGINE
1722
CREATE TABLE test03373_db.test03373_merge_wr_2 (key UInt32, value UInt32) ENGINE=Merge(test03373_db, 'test03373_table_\d+', test03373_db.test03373_table_2);
1823
CREATE TABLE test03373_db.test03373_merge_wr_3 (key UInt32, value UInt32) ENGINE=Merge(REGEXP('test03373_.*'), 'test03373_table_\d+', test03373_db.test03373_table_2);
1924

25+
CREATE TABLE test03373_db.test03373_merge_wr_auto (key UInt32, value UInt32) ENGINE=Merge(test03373_db, 'test03373_table_\d+', auto);
26+
2027
INSERT INTO test03373_db.test03373_table_1 VALUES (1,1);
2128

2229
INSERT INTO test03373_db.test03373_merge_wr_1 VALUES (2,1);
@@ -30,10 +37,38 @@ SELECT * FROM test03373_db.test03373_merge_wr_1 ORDER BY key, value;
3037
SELECT * FROM test03373_db.test03373_merge_wr_2 ORDER BY key, value;
3138
SELECT * FROM test03373_db.test03373_merge_wr_3 ORDER BY key, value;
3239

40+
SELECT * FROM test03373_db.test03373_merge_wr_auto ORDER BY key, value;
41+
42+
-- insert into test03373_table_2
43+
INSERT INTO test03373_db.test03373_merge_wr_auto VALUES (3,1);
44+
SELECT count() FROM test03373_db.test03373_table_2;
45+
SELECT * FROM test03373_db.test03373_table_2 ORDER BY key, value;
46+
47+
CREATE TABLE test03373_db.test03373_table_4 (key UInt32, value UInt32) ENGINE=MergeTree() ORDER BY key;
48+
-- insert into test03373_table_4
49+
INSERT INTO test03373_db.test03373_merge_wr_auto VALUES (3,2);
50+
SELECT count() FROM test03373_db.test03373_table_2;
51+
SELECT * FROM test03373_db.test03373_table_2 ORDER BY key, value;
52+
SELECT count() FROM test03373_db.test03373_table_4;
53+
SELECT * FROM test03373_db.test03373_table_4 ORDER BY key, value;
54+
55+
CREATE TABLE test03373_db.test03373_table_3 (key UInt32, value UInt32) ENGINE=MergeTree() ORDER BY key;
56+
-- insert into test03373_table_4
57+
INSERT INTO test03373_db.test03373_merge_wr_auto VALUES (3,3);
58+
SELECT count() FROM test03373_db.test03373_table_2;
59+
SELECT * FROM test03373_db.test03373_table_2 ORDER BY key, value;
60+
SELECT count() FROM test03373_db.test03373_table_3;
61+
SELECT * FROM test03373_db.test03373_table_3 ORDER BY key, value;
62+
SELECT count() FROM test03373_db.test03373_table_4;
63+
SELECT * FROM test03373_db.test03373_table_4 ORDER BY key, value;
64+
3365
DROP TABLE IF EXISTS test03373_db.test03373_table_1;
3466
DROP TABLE IF EXISTS test03373_db.test03373_table_2;
67+
DROP TABLE IF EXISTS test03373_db.test03373_table_3;
68+
DROP TABLE IF EXISTS test03373_db.test03373_table_4;
3569
DROP TABLE IF EXISTS test03373_db.test03373_merge_ro;
3670
DROP TABLE IF EXISTS test03373_db.test03373_merge_wr_1;
3771
DROP TABLE IF EXISTS test03373_db.test03373_merge_wr_2;
3872
DROP TABLE IF EXISTS test03373_db.test03373_merge_wr_3;
73+
DROP TABLE IF EXISTS test03373_db.test03373_merge_wr_auto;
3974
DROP DATABASE IF EXISTS test03373_db;

0 commit comments

Comments
 (0)