Skip to content

Commit 19c5c7f

Browse files
committed
Merge branch 'master' into fix/net-slow-lock
2 parents 23b2bb7 + e395a36 commit 19c5c7f

File tree

4 files changed

+179
-156
lines changed

4 files changed

+179
-156
lines changed

src/burnchains/burnchain.rs

Lines changed: 53 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -1291,32 +1291,36 @@ impl Burnchain {
12911291
// TODO: don't re-process blocks. See if the block hash is already present in the burn db,
12921292
// and if so, do nothing.
12931293
let download_thread: thread::JoinHandle<Result<(), burnchain_error>> =
1294-
thread::spawn(move || {
1295-
while let Ok(Some(ipc_header)) = downloader_recv.recv() {
1296-
debug!("Try recv next header");
1297-
1298-
let download_start = get_epoch_time_ms();
1299-
let ipc_block = downloader.download(&ipc_header)?;
1300-
let download_end = get_epoch_time_ms();
1301-
1302-
debug!(
1303-
"Downloaded block {} in {}ms",
1304-
ipc_block.height(),
1305-
download_end.saturating_sub(download_start)
1306-
);
1294+
thread::Builder::new()
1295+
.name("burnchain-downloader".to_string())
1296+
.spawn(move || {
1297+
while let Ok(Some(ipc_header)) = downloader_recv.recv() {
1298+
debug!("Try recv next header");
1299+
1300+
let download_start = get_epoch_time_ms();
1301+
let ipc_block = downloader.download(&ipc_header)?;
1302+
let download_end = get_epoch_time_ms();
1303+
1304+
debug!(
1305+
"Downloaded block {} in {}ms",
1306+
ipc_block.height(),
1307+
download_end.saturating_sub(download_start)
1308+
);
13071309

1310+
parser_send
1311+
.send(Some(ipc_block))
1312+
.map_err(|_e| burnchain_error::ThreadChannelError)?;
1313+
}
13081314
parser_send
1309-
.send(Some(ipc_block))
1315+
.send(None)
13101316
.map_err(|_e| burnchain_error::ThreadChannelError)?;
1311-
}
1312-
parser_send
1313-
.send(None)
1314-
.map_err(|_e| burnchain_error::ThreadChannelError)?;
1315-
Ok(())
1316-
});
1317+
Ok(())
1318+
})
1319+
.unwrap();
13171320

1318-
let parse_thread: thread::JoinHandle<Result<(), burnchain_error>> =
1319-
thread::spawn(move || {
1321+
let parse_thread: thread::JoinHandle<Result<(), burnchain_error>> = thread::Builder::new()
1322+
.name("burnchain-parser".to_string())
1323+
.spawn(move || {
13201324
while let Ok(Some(ipc_block)) = parser_recv.recv() {
13211325
debug!("Try recv next block");
13221326

@@ -1338,34 +1342,38 @@ impl Burnchain {
13381342
.send(None)
13391343
.map_err(|_e| burnchain_error::ThreadChannelError)?;
13401344
Ok(())
1341-
});
1345+
})
1346+
.unwrap();
13421347

13431348
let db_thread: thread::JoinHandle<Result<BurnchainBlockHeader, burnchain_error>> =
1344-
thread::spawn(move || {
1345-
let mut last_processed = burn_chain_tip;
1346-
while let Ok(Some(burnchain_block)) = db_recv.recv() {
1347-
debug!("Try recv next parsed block");
1349+
thread::Builder::new()
1350+
.name("burnchain-db".to_string())
1351+
.spawn(move || {
1352+
let mut last_processed = burn_chain_tip;
1353+
while let Ok(Some(burnchain_block)) = db_recv.recv() {
1354+
debug!("Try recv next parsed block");
1355+
1356+
if burnchain_block.block_height() == 0 {
1357+
continue;
1358+
}
13481359

1349-
if burnchain_block.block_height() == 0 {
1350-
continue;
1351-
}
1360+
let insert_start = get_epoch_time_ms();
1361+
last_processed =
1362+
Burnchain::process_block(&myself, &mut burnchain_db, &burnchain_block)?;
1363+
if !coord_comm.announce_new_burn_block() {
1364+
return Err(burnchain_error::CoordinatorClosed);
1365+
}
1366+
let insert_end = get_epoch_time_ms();
13521367

1353-
let insert_start = get_epoch_time_ms();
1354-
last_processed =
1355-
Burnchain::process_block(&myself, &mut burnchain_db, &burnchain_block)?;
1356-
if !coord_comm.announce_new_burn_block() {
1357-
return Err(burnchain_error::CoordinatorClosed);
1368+
debug!(
1369+
"Inserted block {} in {}ms",
1370+
burnchain_block.block_height(),
1371+
insert_end.saturating_sub(insert_start)
1372+
);
13581373
}
1359-
let insert_end = get_epoch_time_ms();
1360-
1361-
debug!(
1362-
"Inserted block {} in {}ms",
1363-
burnchain_block.block_height(),
1364-
insert_end.saturating_sub(insert_start)
1365-
);
1366-
}
1367-
Ok(last_processed)
1368-
});
1374+
Ok(last_processed)
1375+
})
1376+
.unwrap();
13691377

13701378
// feed the pipeline!
13711379
let input_headers = indexer.read_headers(start_block + 1, end_block + 1)?;

src/util/log.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,10 @@ fn print_msg_header(mut rd: &mut dyn RecordDecorator, record: &Record) -> io::Re
5050
write!(rd, " ")?;
5151
write!(rd, "[{}:{}]", record.file(), record.line())?;
5252
write!(rd, " ")?;
53-
write!(rd, "[{:?}]", thread::current().id())?;
53+
match thread::current().name() {
54+
None => write!(rd, "[{:?}]", thread::current().id())?,
55+
Some(name) => write!(rd, "[{}]", name)?,
56+
}
5457

5558
rd.start_whitespace()?;
5659
write!(rd, " ")?;

testnet/stacks-node/src/neon_node.rs

Lines changed: 103 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -574,117 +574,123 @@ fn spawn_peer(
574574
// buffer up blocks to store without stalling the p2p thread
575575
let mut results_with_data = VecDeque::new();
576576

577-
let server_thread = thread::spawn(move || {
578-
let handler_args = RPCHandlerArgs {
579-
exit_at_block_height: exit_at_block_height.as_ref(),
580-
genesis_chainstate_hash: Sha256Sum::from_hex(stx_genesis::GENESIS_CHAINSTATE_HASH)
581-
.unwrap(),
582-
..RPCHandlerArgs::default()
583-
};
577+
let server_thread = thread::Builder::new()
578+
.name("p2p".to_string())
579+
.spawn(move || {
580+
let handler_args = RPCHandlerArgs {
581+
exit_at_block_height: exit_at_block_height.as_ref(),
582+
genesis_chainstate_hash: Sha256Sum::from_hex(stx_genesis::GENESIS_CHAINSTATE_HASH)
583+
.unwrap(),
584+
..RPCHandlerArgs::default()
585+
};
584586

585-
let mut disconnected = false;
586-
let mut num_p2p_state_machine_passes = 0;
587-
let mut num_inv_sync_passes = 0;
588-
let mut mblock_deadline = 0;
587+
let mut disconnected = false;
588+
let mut num_p2p_state_machine_passes = 0;
589+
let mut num_inv_sync_passes = 0;
590+
let mut mblock_deadline = 0;
589591

590-
while !disconnected {
591-
let download_backpressure = results_with_data.len() > 0;
592-
let poll_ms = if !download_backpressure && this.has_more_downloads() {
593-
// keep getting those blocks -- drive the downloader state-machine
594-
debug!(
595-
"P2P: backpressure: {}, more downloads: {}",
592+
while !disconnected {
593+
let download_backpressure = results_with_data.len() > 0;
594+
let poll_ms = if !download_backpressure && this.has_more_downloads() {
595+
// keep getting those blocks -- drive the downloader state-machine
596+
debug!(
597+
"P2P: backpressure: {}, more downloads: {}",
598+
download_backpressure,
599+
this.has_more_downloads()
600+
);
601+
100
602+
} else {
603+
cmp::min(poll_timeout, config.node.microblock_frequency)
604+
};
605+
606+
let mut expected_attachments = match attachments_rx.try_recv() {
607+
Ok(expected_attachments) => expected_attachments,
608+
_ => {
609+
debug!("Atlas: attachment channel is empty");
610+
HashSet::new()
611+
}
612+
};
613+
614+
let _ = Relayer::setup_unconfirmed_state_readonly(&mut chainstate, &sortdb);
615+
recv_unconfirmed_txs(&mut chainstate, unconfirmed_txs.clone());
616+
617+
let network_result = match this.run(
618+
&sortdb,
619+
&mut chainstate,
620+
&mut mem_pool,
621+
Some(&mut dns_client),
596622
download_backpressure,
597-
this.has_more_downloads()
598-
);
599-
100
600-
} else {
601-
cmp::min(poll_timeout, config.node.microblock_frequency)
602-
};
623+
poll_ms,
624+
&handler_args,
625+
&mut expected_attachments,
626+
) {
627+
Ok(res) => res,
628+
Err(e) => {
629+
error!("P2P: Failed to process network dispatch: {:?}", &e);
630+
panic!();
631+
}
632+
};
603633

604-
let mut expected_attachments = match attachments_rx.try_recv() {
605-
Ok(expected_attachments) => expected_attachments,
606-
_ => {
607-
debug!("Atlas: attachment channel is empty");
608-
HashSet::new()
634+
if num_p2p_state_machine_passes < network_result.num_state_machine_passes {
635+
// p2p state-machine did a full pass. Notify anyone listening.
636+
sync_comms.notify_p2p_state_pass();
637+
num_p2p_state_machine_passes = network_result.num_state_machine_passes;
609638
}
610-
};
611639

612-
let _ = Relayer::setup_unconfirmed_state_readonly(&mut chainstate, &sortdb);
613-
recv_unconfirmed_txs(&mut chainstate, unconfirmed_txs.clone());
614-
615-
let network_result = match this.run(
616-
&sortdb,
617-
&mut chainstate,
618-
&mut mem_pool,
619-
Some(&mut dns_client),
620-
download_backpressure,
621-
poll_ms,
622-
&handler_args,
623-
&mut expected_attachments,
624-
) {
625-
Ok(res) => res,
626-
Err(e) => {
627-
error!("P2P: Failed to process network dispatch: {:?}", &e);
628-
panic!();
640+
if num_inv_sync_passes < network_result.num_inv_sync_passes {
641+
// inv-sync state-machine did a full pass. Notify anyone listening.
642+
sync_comms.notify_inv_sync_pass();
643+
num_inv_sync_passes = network_result.num_inv_sync_passes;
629644
}
630-
};
631645

632-
if num_p2p_state_machine_passes < network_result.num_state_machine_passes {
633-
// p2p state-machine did a full pass. Notify anyone listening.
634-
sync_comms.notify_p2p_state_pass();
635-
num_p2p_state_machine_passes = network_result.num_state_machine_passes;
636-
}
637-
638-
if num_inv_sync_passes < network_result.num_inv_sync_passes {
639-
// inv-sync state-machine did a full pass. Notify anyone listening.
640-
sync_comms.notify_inv_sync_pass();
641-
num_inv_sync_passes = network_result.num_inv_sync_passes;
642-
}
643-
644-
if network_result.has_data_to_store() {
645-
results_with_data.push_back(RelayerDirective::HandleNetResult(network_result));
646-
}
646+
if network_result.has_data_to_store() {
647+
results_with_data.push_back(RelayerDirective::HandleNetResult(network_result));
648+
}
647649

648-
if mblock_deadline < get_epoch_time_ms() {
649-
results_with_data.push_back(RelayerDirective::RunMicroblockTenure);
650-
mblock_deadline = get_epoch_time_ms() + (config.node.microblock_frequency as u128);
651-
}
650+
if mblock_deadline < get_epoch_time_ms() {
651+
results_with_data.push_back(RelayerDirective::RunMicroblockTenure);
652+
mblock_deadline = get_epoch_time_ms() + (config.node.microblock_frequency as u128);
653+
}
652654

653-
while let Some(next_result) = results_with_data.pop_front() {
654-
// have blocks, microblocks, and/or transactions (don't care about anything else),
655-
// or a directive to mine microblocks
656-
if let Err(e) = relay_channel.try_send(next_result) {
657-
debug!(
658-
"P2P: {:?}: download backpressure detected",
659-
&this.local_peer
660-
);
661-
match e {
662-
TrySendError::Full(directive) => {
663-
if let RelayerDirective::RunMicroblockTenure = directive {
664-
// can drop this
665-
} else {
666-
// don't lose this data -- just try it again
667-
results_with_data.push_front(directive);
655+
while let Some(next_result) = results_with_data.pop_front() {
656+
// have blocks, microblocks, and/or transactions (don't care about anything else),
657+
// or a directive to mine microblocks
658+
if let Err(e) = relay_channel.try_send(next_result) {
659+
debug!(
660+
"P2P: {:?}: download backpressure detected",
661+
&this.local_peer
662+
);
663+
match e {
664+
TrySendError::Full(directive) => {
665+
if let RelayerDirective::RunMicroblockTenure = directive {
666+
// can drop this
667+
} else {
668+
// don't lose this data -- just try it again
669+
results_with_data.push_front(directive);
670+
}
671+
break;
672+
}
673+
TrySendError::Disconnected(_) => {
674+
info!("P2P: Relayer hang up with p2p channel");
675+
disconnected = true;
676+
break;
668677
}
669-
break;
670-
}
671-
TrySendError::Disconnected(_) => {
672-
info!("P2P: Relayer hang up with p2p channel");
673-
disconnected = true;
674-
break;
675678
}
679+
} else {
680+
debug!("P2P: Dispatched result to Relayer!");
676681
}
677-
} else {
678-
debug!("P2P: Dispatched result to Relayer!");
679682
}
680683
}
681-
}
682-
debug!("P2P thread exit!");
683-
});
684+
debug!("P2P thread exit!");
685+
})
686+
.unwrap();
684687

685-
let _jh = thread::spawn(move || {
686-
dns_resolver.thread_main();
687-
});
688+
let _jh = thread::Builder::new()
689+
.name("dns-resolver".to_string())
690+
.spawn(move || {
691+
dns_resolver.thread_main();
692+
})
693+
.unwrap();
688694

689695
Ok(server_thread)
690696
}
@@ -734,7 +740,7 @@ fn spawn_miner_relayer(
734740
let mut microblock_miner_state = None;
735741
let mut miner_tip = None;
736742

737-
let _relayer_handle = thread::spawn(move || {
743+
let _relayer_handle = thread::Builder::new().name("relayer".to_string()).spawn(move || {
738744
let mut did_register_key = false;
739745
let mut key_registered_at_block = 0;
740746
while let Ok(mut directive) = relay_channel.recv() {
@@ -945,7 +951,7 @@ fn spawn_miner_relayer(
945951
}
946952
}
947953
debug!("Relayer exit!");
948-
});
954+
}).unwrap();
949955

950956
Ok(())
951957
}

0 commit comments

Comments
 (0)