Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/federated/federate.c
Original file line number Diff line number Diff line change
Expand Up @@ -622,7 +622,7 @@ static int handle_tagged_message(int* socket, int fed_id) {
#ifdef FEDERATED_DECENTRALIZED
// For tardy messages in decentralized coordination, we need to figure out what the actual tag will be.
// (Centralized coordination errors out with tardy messages).
if (lf_tag_compare(intended_tag, env->current_tag) <= 0) {
if (lf_tag_compare(intended_tag, env->current_tag) <= 0 && env->execution_started) {
// Message is tardy.
actual_tag = env->current_tag;
actual_tag.microstep++;
Expand Down
16 changes: 9 additions & 7 deletions core/threaded/scheduler_NP.c
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,17 @@ static inline void _lf_sched_insert_reaction(lf_scheduler_t* scheduler, reaction
*
* @return 1 if any reaction is ready. 0 otherwise.
*/
static int _lf_sched_distribute_ready_reactions(lf_scheduler_t* scheduler) {
static int _lf_sched_distribute_ready_reactions_locked(lf_scheduler_t* scheduler) {
// Note: All the threads are idle, which means that they are done inserting
// reactions. Therefore, the reaction vectors can be accessed without
// locking a mutex.
while (scheduler->custom_data->next_reaction_level <= scheduler->max_reaction_level) {
#ifdef FEDERATED
lf_stall_advance_level_federation(scheduler->env, scheduler->custom_data->next_reaction_level);
environment_t* top_level_env;
_lf_get_environments(&top_level_env);
if (scheduler->env == top_level_env) {
lf_stall_advance_level_federation_locked(scheduler->custom_data->next_reaction_level);
}
#endif
scheduler->custom_data->executing_reactions =
scheduler->custom_data->triggered_reactions[scheduler->custom_data->next_reaction_level];
Expand Down Expand Up @@ -163,30 +167,28 @@ static void _lf_sched_signal_stop(lf_scheduler_t* scheduler) {
static void _lf_scheduler_try_advance_tag_and_distribute(lf_scheduler_t* scheduler) {
// Reset the index
environment_t* env = scheduler->env;
LF_MUTEX_LOCK(&env->mutex);
scheduler->indexes[scheduler->custom_data->next_reaction_level - 1] = 0;

// Loop until it's time to stop or work has been distributed
while (true) {
if (scheduler->custom_data->next_reaction_level == (scheduler->max_reaction_level + 1)) {
scheduler->custom_data->next_reaction_level = 0;
LF_MUTEX_LOCK(&env->mutex);
// Nothing more happening at this tag.
LF_PRINT_DEBUG("Scheduler: Advancing tag.");
// This worker thread will take charge of advancing tag.
if (_lf_sched_advance_tag_locked(scheduler)) {
LF_PRINT_DEBUG("Scheduler: Reached stop tag.");
_lf_sched_signal_stop(scheduler);
LF_MUTEX_UNLOCK(&env->mutex);
break;
}
LF_MUTEX_UNLOCK(&env->mutex);
}

if (_lf_sched_distribute_ready_reactions(scheduler) > 0) {
if (_lf_sched_distribute_ready_reactions_locked(scheduler) > 0) {
_lf_sched_notify_workers(scheduler);
break;
}
}
LF_MUTEX_UNLOCK(&env->mutex);
}

/**
Expand Down
Loading