diff --git a/core/federated/federate.c b/core/federated/federate.c index af8fcd177..b55644108 100644 --- a/core/federated/federate.c +++ b/core/federated/federate.c @@ -622,7 +622,7 @@ static int handle_tagged_message(int* socket, int fed_id) { #ifdef FEDERATED_DECENTRALIZED // For tardy messages in decentralized coordination, we need to figure out what the actual tag will be. // (Centralized coordination errors out with tardy messages). - if (lf_tag_compare(intended_tag, env->current_tag) <= 0) { + if (lf_tag_compare(intended_tag, env->current_tag) <= 0 && env->execution_started) { // Message is tardy. actual_tag = env->current_tag; actual_tag.microstep++; diff --git a/core/threaded/scheduler_NP.c b/core/threaded/scheduler_NP.c index 65f9c542f..b50dd5b69 100644 --- a/core/threaded/scheduler_NP.c +++ b/core/threaded/scheduler_NP.c @@ -95,13 +95,17 @@ static inline void _lf_sched_insert_reaction(lf_scheduler_t* scheduler, reaction * * @return 1 if any reaction is ready. 0 otherwise. */ -static int _lf_sched_distribute_ready_reactions(lf_scheduler_t* scheduler) { +static int _lf_sched_distribute_ready_reactions_locked(lf_scheduler_t* scheduler) { // Note: All the threads are idle, which means that they are done inserting // reactions. Therefore, the reaction vectors can be accessed without // locking a mutex. while (scheduler->custom_data->next_reaction_level <= scheduler->max_reaction_level) { #ifdef FEDERATED - lf_stall_advance_level_federation(scheduler->env, scheduler->custom_data->next_reaction_level); + environment_t* top_level_env; + _lf_get_environments(&top_level_env); + if (scheduler->env == top_level_env) { + lf_stall_advance_level_federation_locked(scheduler->custom_data->next_reaction_level); + } #endif scheduler->custom_data->executing_reactions = scheduler->custom_data->triggered_reactions[scheduler->custom_data->next_reaction_level]; @@ -163,30 +167,28 @@ static void _lf_sched_signal_stop(lf_scheduler_t* scheduler) { static void _lf_scheduler_try_advance_tag_and_distribute(lf_scheduler_t* scheduler) { // Reset the index environment_t* env = scheduler->env; + LF_MUTEX_LOCK(&env->mutex); scheduler->indexes[scheduler->custom_data->next_reaction_level - 1] = 0; - // Loop until it's time to stop or work has been distributed while (true) { if (scheduler->custom_data->next_reaction_level == (scheduler->max_reaction_level + 1)) { scheduler->custom_data->next_reaction_level = 0; - LF_MUTEX_LOCK(&env->mutex); // Nothing more happening at this tag. LF_PRINT_DEBUG("Scheduler: Advancing tag."); // This worker thread will take charge of advancing tag. if (_lf_sched_advance_tag_locked(scheduler)) { LF_PRINT_DEBUG("Scheduler: Reached stop tag."); _lf_sched_signal_stop(scheduler); - LF_MUTEX_UNLOCK(&env->mutex); break; } - LF_MUTEX_UNLOCK(&env->mutex); } - if (_lf_sched_distribute_ready_reactions(scheduler) > 0) { + if (_lf_sched_distribute_ready_reactions_locked(scheduler) > 0) { _lf_sched_notify_workers(scheduler); break; } } + LF_MUTEX_UNLOCK(&env->mutex); } /**