Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2,347 changes: 0 additions & 2,347 deletions blockprod/src/detail/tests.rs

This file was deleted.

179 changes: 179 additions & 0 deletions blockprod/src/detail/tests/collect_transactions.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
// Copyright (c) 2023 RBB S.r.l
// opensource@mintlayer.org
// SPDX-License-Identifier: MIT
// Licensed under the MIT License;
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://github.com/mintlayer/mintlayer-core/blob/master/LICENSE
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use common::{
chain::block::timestamp::BlockTimestamp,
primitives::{H256, Id},
time_getter::TimeGetter,
};
use mempool::{
error::{BlockConstructionError, TxValidationError},
tx_accumulator::{DefaultTxAccumulator, PackingStrategy},
};
use mocks::MockMempoolInterface;
use subsystem::error::ResponseError;
use utils::once_destructor::OnceDestructor;

use crate::{
BlockProductionError, detail::collect_transactions, tests::helpers::setup_blockprod_test,
};

// A dummy timestamp for tests where the block timestamp is irrelevant
const DUMMY_TIMESTAMP: BlockTimestamp = BlockTimestamp::from_int_seconds(0u64);

// TODO: add tests for mempool rejecting transaction accumulator

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn collect_txs_failed() {
let (mut manager, chain_config, _chainstate, _mempool, _p2p) =
setup_blockprod_test(None, TimeGetter::default());

let mut mock_mempool = MockMempoolInterface::default();
mock_mempool.expect_collect_txs().return_once(|_, _, _| {
Err(BlockConstructionError::Validity(
TxValidationError::SubsystemCallError(ResponseError::NoResponse.into()),
))
});

let mock_mempool_subsystem = manager.add_subsystem("mock-mempool", mock_mempool);

let current_tip = Id::new(H256::zero());

let shutdown = manager.make_shutdown_trigger();
let tester = tokio::spawn(async move {
let transactions = collect_transactions(
&mock_mempool_subsystem,
&chain_config,
current_tip,
DUMMY_TIMESTAMP,
vec![],
vec![],
PackingStrategy::FillSpaceFromMempool,
)
.await;

match transactions {
Err(BlockProductionError::MempoolBlockConstruction(
BlockConstructionError::Validity(TxValidationError::SubsystemCallError(_)),
)) => {}
_ => panic!("Expected collect_tx() to fail"),
};

shutdown.initiate();
});

let _ = tokio::join!(manager.main(), tester);
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn subsystem_error() {
let (mut manager, chain_config, _chainstate, _mempool, _p2p) =
setup_blockprod_test(None, TimeGetter::default());

let mock_mempool = MockMempoolInterface::default();
let mock_mempool_subsystem = manager.add_subsystem("mock-mempool", mock_mempool);

mock_mempool_subsystem
.as_submit_only()
.submit({
let shutdown = manager.make_shutdown_trigger();
move |_| shutdown.initiate()
})
.unwrap();

// shutdown straight after startup, *then* call collect_transactions()
manager.main().await;

let current_tip = Id::new(H256::zero());

// spawn rather than adding a subsystem as manager is moved into main() above
tokio::spawn(async move {
let transactions = collect_transactions(
&mock_mempool_subsystem,
&chain_config,
current_tip,
DUMMY_TIMESTAMP,
vec![],
vec![],
PackingStrategy::LeaveEmptySpace,
)
.await;

match transactions {
Ok(_) => panic!("Expected an error"),
Err(BlockProductionError::SubsystemCallError(_)) => {}
Err(err) => panic!("Expected a subsystem error, got {err:?}"),
};
})
.await
.expect("Subsystem error thread failed");
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn succeeded() {
let (mut manager, chain_config, _chainstate, _mempool, _p2p) =
setup_blockprod_test(None, TimeGetter::default());

let mut mock_mempool = MockMempoolInterface::default();

mock_mempool
.expect_collect_txs()
.returning(|_, _, _| {
Ok(Some(Box::new(DefaultTxAccumulator::new(
usize::default(),
Id::new(H256::zero()),
DUMMY_TIMESTAMP,
))))
})
.times(1);

let mock_mempool_subsystem = manager.add_subsystem("mock-mempool", mock_mempool);

let current_tip = Id::new(H256::zero());

let join_handle = tokio::spawn({
let shutdown_trigger = manager.make_shutdown_trigger();
async move {
// Ensure a shutdown signal will be sent by the end of the scope
let _shutdown_signal = OnceDestructor::new(move || {
shutdown_trigger.initiate();
});

let transactions = collect_transactions(
&mock_mempool_subsystem,
&chain_config,
current_tip,
DUMMY_TIMESTAMP,
vec![],
vec![],
PackingStrategy::FillSpaceFromMempool,
)
.await;

assert!(
transactions.is_ok(),
"Expected collect_transactions() to succeed"
);

assert!(
transactions.unwrap().is_some(),
"Expected collect_transactions() to return Some"
);
}
});

manager.main().await;
join_handle.await.unwrap();
}
19 changes: 19 additions & 0 deletions blockprod/src/detail/tests/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Copyright (c) 2023 RBB S.r.l
// opensource@mintlayer.org
// SPDX-License-Identifier: MIT
// Licensed under the MIT License;
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://github.com/mintlayer/mintlayer-core/blob/master/LICENSE
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

mod collect_transactions;
mod process_block_with_custom_id;
mod produce_block;
mod stop_jobs;
155 changes: 155 additions & 0 deletions blockprod/src/detail/tests/process_block_with_custom_id.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
// Copyright (c) 2023 RBB S.r.l
// opensource@mintlayer.org
// SPDX-License-Identifier: MIT
// Licensed under the MIT License;
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://github.com/mintlayer/mintlayer-core/blob/master/LICENSE
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use rstest::rstest;

use common::time_getter::TimeGetter;
use mempool::tx_accumulator::PackingStrategy;
use randomness::RngExt as _;
use test_utils::random::{Seed, make_seedable_rng};
use utils::once_destructor::OnceDestructor;

use crate::{
BlockProduction, BlockProductionError,
detail::{GenerateBlockInputData, job_manager::JobManagerError},
prepare_thread_pool, test_blockprod_config,
tests::helpers::setup_blockprod_test,
};

#[rstest]
#[trace]
#[case(Seed::from_entropy())]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn multiple_jobs_with_wait(#[case] seed: Seed) {
let (manager, chain_config, chainstate, mempool, p2p) =
setup_blockprod_test(None, TimeGetter::default());

let mut rng = make_seedable_rng(seed);

let jobs_to_create = rng.random_range(1..=20);

let block_production = BlockProduction::new(
chain_config,
Arc::new(test_blockprod_config()),
chainstate,
mempool,
p2p,
Default::default(),
prepare_thread_pool(1),
)
.expect("Error initializing blockprod");

let join_handle = tokio::spawn({
let shutdown_trigger = manager.make_shutdown_trigger();
async move {
// Ensure a shutdown signal will be sent by the end of the scope
let _shutdown_signal = OnceDestructor::new(move || {
shutdown_trigger.initiate();
});

let produce_blocks_futures_iter = (0..jobs_to_create).map(|_| {
let id: Vec<u8> = (0..1024).map(|_| rng.random::<u8>()).collect();

block_production.produce_block_with_custom_id(
GenerateBlockInputData::None,
vec![],
vec![],
PackingStrategy::LeaveEmptySpace,
Some(id),
)
});

let produce_results = futures::future::join_all(produce_blocks_futures_iter).await;

let jobs_finished_iter = produce_results.into_iter().map(|r| r.unwrap());

for (_block, job) in jobs_finished_iter {
job.await.unwrap();
}

let jobs_count = block_production.job_manager_handle.get_job_count().await.unwrap();
assert_eq!(jobs_count, 0, "Job count was incorrect {jobs_count}");
}
});

manager.main().await;
join_handle.await.unwrap();
}

#[rstest]
#[trace]
#[case(Seed::from_entropy())]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn multiple_jobs_without_wait_same_jobkey(#[case] seed: Seed) {
let (manager, chain_config, chainstate, mempool, p2p) =
setup_blockprod_test(None, TimeGetter::default());

let mut rng = make_seedable_rng(seed);

let jobs_to_create = 10 + rng.random_range(1..=20);

let block_production = BlockProduction::new(
chain_config,
Arc::new(test_blockprod_config()),
chainstate,
mempool,
p2p,
Default::default(),
prepare_thread_pool(1),
)
.expect("Error initializing blockprod");

let join_handle = tokio::spawn({
let shutdown_trigger = manager.make_shutdown_trigger();
async move {
// Ensure a shutdown signal will be sent by the end of the scope
let _shutdown_signal = OnceDestructor::new(move || {
shutdown_trigger.initiate();
});

let id: Vec<u8> = (0..1024).map(|_| rng.random::<u8>()).collect();

// The following is a race between the successive
// calls to produce_block_with_custom_id() and the job
// manager cleaning up, so we try a number of times
// before giving up

for _ in 0..jobs_to_create {
let result = block_production
.produce_block_with_custom_id(
GenerateBlockInputData::None,
vec![],
vec![],
PackingStrategy::LeaveEmptySpace,
Some(id.clone()),
)
.await;

match result {
Err(BlockProductionError::JobManagerError(
JobManagerError::JobAlreadyExists,
)) => break,
Err(_) => panic!("Duplicate job key should fail"),
Ok(_) => continue,
}
}
}
});

manager.main().await;
join_handle.await.unwrap();
}
Loading
Loading