Skip to content

Commit b2c1406

Browse files
obdevCharles0429simonjoyleta1iive
authored andcommitted
Merge branch 'dag_patch_lite' into 'master'
Co-authored-by: Charles0429 <[email protected]> Co-authored-by: simonjoylet <[email protected]> Co-authored-by: a1iive <[email protected]>
1 parent 2c524b5 commit b2c1406

File tree

555 files changed

+61871
-8055
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

555 files changed

+61871
-8055
lines changed

deps/oblib/src/lib/list/ob_dlist.h

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ class ObDList
106106
DLinkNode *remove_first();
107107

108108
void push_range(ObDList<DLinkNode> &range);
109+
void push_back_range(ObDList<DLinkNode> &range);
109110
void pop_range(int32_t num, ObDList<DLinkNode> &range);
110111

111112
//the list is empty or not
@@ -292,6 +293,24 @@ void ObDList<DLinkNode>::push_range(ObDList<DLinkNode> &range)
292293
}
293294
}
294295
template <typename DLinkNode>
296+
void ObDList<DLinkNode>::push_back_range(ObDList<DLinkNode> &range)
297+
{
298+
if (!range.is_empty()) {
299+
if (is_empty()) {
300+
push_range(range);
301+
} else {
302+
DLinkNode *tail = header_.prev_;
303+
DLinkNode *first = range.header_.next_;
304+
DLinkNode *last = range.header_.prev_;
305+
first->prev_ = NULL;
306+
last->next_ = NULL;
307+
tail->add_range_after(first, last);
308+
size_ += range.get_size();
309+
range.reset();
310+
}
311+
}
312+
}
313+
template <typename DLinkNode>
295314
void ObDList<DLinkNode>::pop_range(int32_t num, ObDList<DLinkNode> &range)
296315
{
297316
DLinkNode *first = this->header_.next_;

deps/oblib/src/lib/ob_errno.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -438,6 +438,7 @@ constexpr int OB_ERR_DUPLICATE_INDEX = -9137;
438438
constexpr int OB_INVALID_KMS_DEST = -9139;
439439
constexpr int OB_OBJECT_STORAGE_OBJECT_LOCKED_BY_WORM = -9140;
440440
constexpr int OB_OBJECT_STORAGE_OVERWRITE_CONTENT_MISMATCH = -9142;
441+
constexpr int OB_DAG_TASK_IS_SUSPENDED = -9143;
441442
constexpr int OB_ERR_XML_PARSE = -9549;
442443
constexpr int OB_ERR_XSLT_PARSE = -9574;
443444
constexpr int OB_HDFS_CONNECT_FS_ERROR = -11019;

deps/oblib/src/lib/queue/ob_lighty_queue.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ class LightyQueue
9999
bool is_inited() const { return queue_.is_inited(); }
100100
int push(void *data, const int64_t timeout = 0);
101101
int pop(void *&data, const int64_t timeout = 0);
102+
int64_t to_string(char* buf, const int64_t buf_len) const { return 0; }
102103
private:
103104
typedef ObOrderedFixedQueue<void> Queue;
104105
Queue queue_;

deps/oblib/src/lib/wait_event/ob_wait_event.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,8 @@ WAIT_EVENT_DEF(ASYNC_COMMITTING_WAIT, 16018, "tx commiting wait", "", "", "", CO
125125
WAIT_EVENT_DEF(OBCDC_PART_MGR_SCHEMA_VERSION_WAIT, 18000, "oblog part mgr schema version wait", "", "", "", CONCURRENCY, true, true)
126126
WAIT_EVENT_DEF(BACKUP_TMP_FILE_WAIT, 18001, "backup tmp file wait", "", "", "", CONCURRENCY, true, true)
127127
WAIT_EVENT_DEF(BACKUP_TMP_FILE_QUEUE_WAIT, 18002, "backup tmp file queue wait", "", "", "", CONCURRENCY, true, true)
128+
WAIT_EVENT_DEF(CG_ROW_TMP_FILE_WAIT, 18003, "cg row file wait", "", "", "", CONCURRENCY, true, true)
129+
WAIT_EVENT_DEF(CG_BLOCK_TMP_FILE_WAIT, 18004, "cg block file wait", "", "", "", CONCURRENCY, true, true)
128130
WAIT_EVENT_DEF(SYNC_GET_GTS_WAIT, 18101, "sync get gts timestamp wait", "address", "", "", NETWORK, true, true)
129131
WAIT_EVENT_DEF(LOCK_FOR_READ_WAIT, 18102, "sleep: lock for read need wait for concurrency control", "sleep_interval", "", "", CONCURRENCY, true, true)
130132
WAIT_EVENT_DEF(DEADLOCK_MGR_BUCKET_LOCK, 18103, "latch: deadlock manager bucket access wait", "spin count", "", "", CONCURRENCY, true, true)

mittest/mtlenv/storage/access/test_co_merge.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -375,6 +375,7 @@ void TestCOMerge::prepare_co_sstable(
375375
DATA_CURRENT_VERSION,
376376
table_schema.get_micro_index_clustered(),
377377
0 /*tablet_transfer_seq*/,
378+
0 /*concurrent_cnt*/,
378379
share::SCN::invalid_scn(),
379380
&cg_schema,
380381
i));
@@ -2465,6 +2466,7 @@ TEST_F(TestCOMerge, test_rebuild_sstable)
24652466

24662467
//prepare merge_ctx
24672468
prepare_merge_context(MAJOR_MERGE, false, trans_version_range, merge_context);
2469+
merge_context.static_desc_.concurrent_cnt_ = 2;
24682470
merge_context.array_count_ = 3;
24692471
alloc_merge_infos(merge_context);
24702472
OK(merge_context.prepare_index_builder(0, 3));

mittest/mtlenv/storage/blocksstable/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,4 +18,6 @@ storage_dml_unittest(test_skip_index_sortedness)
1818
storage_dml_unittest(test_truncate_info_kv_cache)
1919
target_link_libraries(test_truncate_info_kv_cache PUBLIC truncate_info_helper)
2020
storage_dml_unittest(test_ob_block_writer_concurrent_guard)
21+
storage_dml_unittest(test_dag_macro_writer)
22+
storage_dml_unittest(test_macro_block_row_bare_iterator)
2123
storage_dml_unittest(test_micro_hash_index)

mittest/mtlenv/storage/blocksstable/ob_index_block_data_prepare.h

Lines changed: 28 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -623,7 +623,8 @@ void TestIndexBlockDataPrepare::prepare_data(const int64_t micro_block_size)
623623
scn.convert_for_tx(SNAPSHOT_VERSION);
624624
ObWholeDataStoreDesc desc;
625625
ASSERT_EQ(OB_SUCCESS, desc.init(false/*is_ddl*/, table_schema_, ObLSID(ls_id_), ObTabletID(tablet_id_), merge_type_, SNAPSHOT_VERSION,
626-
DATA_CURRENT_VERSION, table_schema_.get_micro_index_clustered(), 0 /*transfer_seq*/, scn));
626+
DATA_CURRENT_VERSION, table_schema_.get_micro_index_clustered(), 0 /*transfer_seq*/,
627+
0/*concurrent_cnt*/, scn));
627628
desc.get_desc().static_desc_->schema_version_ = 10;
628629
void *builder_buf = allocator_.alloc(sizeof(ObSSTableIndexBuilder));
629630
root_index_builder_ = new (builder_buf) ObSSTableIndexBuilder(false /* not need writer buffer*/);
@@ -672,7 +673,8 @@ void TestIndexBlockDataPrepare::prepare_discontinuous_data(const int64_t micro_b
672673
scn.convert_for_tx(SNAPSHOT_VERSION);
673674
ObWholeDataStoreDesc desc;
674675
ASSERT_EQ(OB_SUCCESS, desc.init(false/*is_ddl*/, table_schema_, ObLSID(ls_id_), ObTabletID(tablet_id_), merge_type_, SNAPSHOT_VERSION,
675-
DATA_CURRENT_VERSION, table_schema_.get_micro_index_clustered(), 0 /*transfer_seq*/, scn));
676+
DATA_CURRENT_VERSION, table_schema_.get_micro_index_clustered(), 0 /*transfer_seq*/, 0/*concurrent_cnt*/,
677+
scn));
676678
desc.get_desc().static_desc_->schema_version_ = 10;
677679
void *builder_buf = allocator_.alloc(sizeof(ObSSTableIndexBuilder));
678680
root_index_builder_ = new (builder_buf) ObSSTableIndexBuilder(false /* not need writer buffer*/);
@@ -744,7 +746,8 @@ void TestIndexBlockDataPrepare::prepare_cg_data()
744746
ObWholeDataStoreDesc desc;
745747
share::SCN scn;
746748
scn.convert_for_tx(SNAPSHOT_VERSION);
747-
ASSERT_EQ(OB_SUCCESS, desc.init(false/*is_ddl*/, table_schema_, ObLSID(ls_id_), ObTabletID(tablet_id_), merge_type_, SNAPSHOT_VERSION, DATA_CURRENT_VERSION, table_schema_.get_micro_index_clustered(), 0 /*transfer_seq*/, scn));
749+
ASSERT_EQ(OB_SUCCESS, desc.init(false/*is_ddl*/, table_schema_, ObLSID(ls_id_), ObTabletID(tablet_id_), merge_type_,
750+
SNAPSHOT_VERSION, DATA_CURRENT_VERSION, table_schema_.get_micro_index_clustered(), 0 /*transfer_seq*/, 0/*concurrent_cnt*/, scn));
748751
ObIArray<ObColDesc> &col_descs = desc.get_desc().col_desc_->col_desc_array_;
749752
for (int64_t i = 0; i < col_descs.count(); ++i) {
750753
if (col_descs.at(i).col_type_.type_ == ObIntType) {
@@ -766,8 +769,7 @@ void TestIndexBlockDataPrepare::prepare_cg_data()
766769
OK(data_desc.init(false/*is_ddl*/, table_schema_, ObLSID(ls_id_), ObTabletID(tablet_id_),
767770
merge_type_, SNAPSHOT_VERSION, DATA_CURRENT_VERSION,
768771
table_schema_.get_micro_index_clustered(),
769-
0 /*transfer_seq*/,
770-
scn, &cg_schema, 0));
772+
0 /*transfer_seq*/, 0/*concurrent_cnt*/, scn, &cg_schema, 0));
771773
data_desc.get_desc().static_desc_->schema_version_ = 10;
772774
void *builder_buf = allocator_.alloc(sizeof(ObSSTableIndexBuilder));
773775
root_index_builder_ = new (builder_buf) ObSSTableIndexBuilder(false /* not need writer buffer*/);
@@ -811,7 +813,9 @@ void TestIndexBlockDataPrepare::prepare_ddl_memtable()
811813

812814
share::SCN ddl_start_scn;
813815
ddl_start_scn.convert_from_ts(ObTimeUtility::current_time());
814-
ASSERT_EQ(OB_SUCCESS, ddl_memtable_.init(allocator_, *tablet_handle.get_obj(), sstable_.get_key(), ddl_start_scn, DATA_CURRENT_VERSION));
816+
ObStorageSchema *storage_schema = nullptr;
817+
ASSERT_EQ(OB_SUCCESS, tablet_handle.get_obj()->load_storage_schema(allocator_, storage_schema));
818+
ASSERT_EQ(OB_SUCCESS, ddl_memtable_.init(allocator_, *tablet_handle.get_obj(), sstable_.get_key(), ddl_start_scn, DATA_CURRENT_VERSION, storage_schema));
815819

816820
SMART_VAR(ObSSTableSecMetaIterator, meta_iter) {
817821
ObDatumRange query_range;
@@ -942,9 +946,12 @@ void TestIndexBlockDataPrepare::prepare_partial_ddl_data()
942946
ObMacroBlockWriter writer;
943947
row_generate_.reset();
944948
ObWholeDataStoreDesc desc;
949+
const share::SCN reorganization_scn(share::SCN::min_scn());
945950
share::SCN end_scn;
946951
end_scn.convert_from_ts(ObTimeUtility::current_time());
947-
ASSERT_EQ(OB_SUCCESS, desc.init(true/*is ddl*/, table_schema_, ObLSID(ls_id_), ObTabletID(tablet_id_), merge_type_, SNAPSHOT_VERSION, CLUSTER_CURRENT_VERSION, table_schema_.get_micro_index_clustered(), 0 /*transfer_seq*/, end_scn));
952+
ASSERT_EQ(OB_SUCCESS, desc.init(true/*is ddl*/, table_schema_, ObLSID(ls_id_), ObTabletID(tablet_id_),
953+
merge_type_, SNAPSHOT_VERSION, CLUSTER_CURRENT_VERSION, table_schema_.get_micro_index_clustered(), 0 /*transfer_seq*/,
954+
0/*concurrent_cnt*/, end_scn));
948955
void *builder_buf = allocator_.alloc(sizeof(ObSSTableIndexBuilder));
949956
merge_root_index_builder_ = new (builder_buf) ObSSTableIndexBuilder(false /* not need writer buffer */);
950957
ASSERT_NE(nullptr, merge_root_index_builder_);
@@ -1019,7 +1026,9 @@ void TestIndexBlockDataPrepare::prepare_partial_cg_data()
10191026
ObWholeDataStoreDesc desc;
10201027
share::SCN end_scn;
10211028
end_scn.convert_from_ts(ObTimeUtility::current_time());
1022-
ASSERT_EQ(OB_SUCCESS, desc.init(true/*is ddl*/, table_schema_, ObLSID(ls_id_), ObTabletID(tablet_id_), merge_type_, SNAPSHOT_VERSION, CLUSTER_CURRENT_VERSION, table_schema_.get_micro_index_clustered(), 0 /*transfer_seq*/, end_scn));
1029+
ASSERT_EQ(OB_SUCCESS, desc.init(true/*is ddl*/, table_schema_, ObLSID(ls_id_), ObTabletID(tablet_id_),
1030+
merge_type_, SNAPSHOT_VERSION, CLUSTER_CURRENT_VERSION, table_schema_.get_micro_index_clustered(), 0 /*transfer_seq*/,
1031+
0/*concurrent_cnt*/, end_scn));
10231032
void *builder_buf = allocator_.alloc(sizeof(ObSSTableIndexBuilder));
10241033
merge_root_index_builder_ = new (builder_buf) ObSSTableIndexBuilder(false /* not need index buffer */);
10251034
ASSERT_NE(nullptr, merge_root_index_builder_);
@@ -1256,7 +1265,9 @@ void TestIndexBlockDataPrepare::prepare_contrastive_sstable()
12561265
ObWholeDataStoreDesc desc;
12571266
share::SCN end_scn;
12581267
end_scn.convert_from_ts(ObTimeUtility::current_time());
1259-
ASSERT_EQ(OB_SUCCESS, desc.init(false/*is_ddl*/, table_schema_, ObLSID(ls_id_), ObTabletID(tablet_id_), merge_type_, SNAPSHOT_VERSION, CLUSTER_CURRENT_VERSION, table_schema_.get_micro_index_clustered(), 0 /*transfer_seq*/, end_scn));
1268+
ASSERT_EQ(OB_SUCCESS, desc.init(false/*is_ddl*/, table_schema_, ObLSID(ls_id_), ObTabletID(tablet_id_),
1269+
merge_type_, SNAPSHOT_VERSION, CLUSTER_CURRENT_VERSION, table_schema_.get_micro_index_clustered(), 0 /*transfer_seq*/,
1270+
0/*concurrent_cnt*/, end_scn));
12601271
void *builder_buf = allocator_.alloc(sizeof(ObSSTableIndexBuilder));
12611272
root_index_builder_ = new (builder_buf) ObSSTableIndexBuilder(false /* not need writer buffer*/);
12621273
ASSERT_NE(nullptr, root_index_builder_);
@@ -1317,13 +1328,19 @@ void TestIndexBlockDataPrepare::prepare_merge_ddl_kvs()
13171328
ObITable::TableKey ddl_key = sstable_.get_key();
13181329
ddl_key.table_type_ = ObITable::TableType::MAJOR_SSTABLE;
13191330
for (int64_t i = 0; i < DDL_KVS_CNT; ++i) {
1320-
void *buf = allocator_.alloc(sizeof(ObDDLMemtable));
1331+
void *buf = ddl_kv_handle_.get_obj()->ddl_memtable_allocator_.alloc(sizeof(ObDDLMemtable));
13211332
ASSERT_NE(nullptr, buf);
13221333
ObDDLMemtable *new_ddl_table = new (buf) ObDDLMemtable;
13231334
ddl_key.slice_range_.start_slice_idx_ = 0;
13241335
ddl_key.slice_range_.end_slice_idx_ = 0;
1325-
ASSERT_EQ(OB_SUCCESS, new_ddl_table->init(allocator_, *tablet_handle.get_obj(), ddl_key, ddl_start_scn, 4000));
1336+
ObStorageSchema *storage_schema = nullptr;
1337+
void *buf2 = ddl_kv_handle_.get_obj()->ddl_memtable_allocator_.alloc(sizeof(ObArenaAllocator));
1338+
ASSERT_NE(nullptr, buf2);
1339+
ObArenaAllocator *allocator = new (buf2) ObArenaAllocator();
1340+
ASSERT_EQ(OB_SUCCESS, tablet_handle.get_obj()->load_storage_schema(allocator_, storage_schema));
1341+
ASSERT_EQ(OB_SUCCESS, new_ddl_table->init(*allocator, *tablet_handle.get_obj(), ddl_key, ddl_start_scn, 4000, storage_schema));
13261342
ASSERT_EQ(OB_SUCCESS, ddl_kv_handle_.get_obj()->get_ddl_memtables().push_back(new_ddl_table));
1343+
ASSERT_EQ(OB_SUCCESS, ddl_kv_handle_.get_obj()->get_ddl_memtable_allocators().push_back(allocator));
13271344
}
13281345
ObDDLKVHandle kv_handle;
13291346
ObDDLKvMgrHandle ddl_kv_mgr_handle;

mittest/mtlenv/storage/blocksstable/test_clustered_index_writer.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ void TestClusteredIndexWriter::prepare_data_store_desc(
8585
ObTimeUtility::fast_current_time() /*snapshot_version*/,
8686
DATA_CURRENT_VERSION,
8787
table_schema_.get_micro_index_clustered(),
88-
0 /*transfer_seq*/);
88+
0 /*transfer_seq*/, 0/*concurrent_cnt*/);
8989
data_desc.get_desc().sstable_index_builder_ = sstable_index_builder;
9090
ASSERT_EQ(OB_SUCCESS, ret);
9191
}

0 commit comments

Comments
 (0)